From 885f92c9fe90851ea631dcccde12ec22a5c04ec9 Mon Sep 17 00:00:00 2001 From: Jason Liu Date: Wed, 20 Dec 2023 17:33:19 -0500 Subject: [PATCH 1/7] improve uploads --- applications/wikipedia/main.py | 56 +++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/applications/wikipedia/main.py b/applications/wikipedia/main.py index 084b28e..147ce69 100644 --- a/applications/wikipedia/main.py +++ b/applications/wikipedia/main.py @@ -24,7 +24,7 @@ data_dir = f"{cache_dir}/{dataset_name}" DATA_PATH = Path(data_dir) -PUSH_TO_HUB = False +PUSH_TO_HUB = True dataset_name = f"567-labs/wikipedia-embedding-{MODEL_SLUG}-sample" dataset_file = "wiki-embeddings.parquet" @@ -145,7 +145,7 @@ async def embed(self, chunks): @stub.function( - image=Image.debian_slim().pip_install("datasets", "pyarrow", "tqdm"), + image=Image.debian_slim().pip_install("datasets", "pyarrow", "tqdm", "hf_transfer"), volumes={cache_dir: volume}, timeout=84600, secret=Secret.from_name("huggingface-credentials"), @@ -186,7 +186,13 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50): start = time.perf_counter() acc_chunks = [] embeddings = [] - for batch_chunks, batch_embeddings in model.embed.map(batches, order_outputs=False): + for resp in model.embed.map(batches, order_outputs=False, return_exceptions=True): + if isinstance(resp, Exception): + print(f"Exception: {resp}") + continue + + batch_chunks, batch_embeddings = resp + acc_chunks.extend(batch_chunks) embeddings.extend(batch_embeddings) @@ -207,29 +213,35 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50): "extrapolated_duration": extrapolated_duration_cps_fmt, } + print(json.dumps(resp, indent=2)) + if PUSH_TO_HUB: - print(f"Pushing to hub {dataset_name}") - table = pa.Table.from_arrays( - [ - pa.array([chunk[0] for chunk in acc_chunks]), # id - pa.array([chunk[1] for chunk in acc_chunks]), # url - pa.array([chunk[2] for chunk in acc_chunks]), # title - pa.array([chunk[3] for chunk in acc_chunks]), # text - pa.array(embeddings), - ], - names=["id", "url", "title", "text", "embedding"], - ) - pq.write_table(table, dataset_file) - dataset = load_dataset("parquet", data_files=dataset_file) - dataset.push_to_hub(dataset_name, token=os.environ["HUGGINGFACE_TOKEN"]) + try: + print(f"Pushing to hub {dataset_name}") + table = pa.Table.from_arrays( + [ + pa.array([chunk[0] for chunk in acc_chunks]), # id + pa.array([chunk[1] for chunk in acc_chunks]), # url + pa.array([chunk[2] for chunk in acc_chunks]), # title + pa.array([chunk[3] for chunk in acc_chunks]), # text + pa.array(embeddings), + ], + names=["id", "url", "title", "text", "embedding"], + ) + pq.write_table(table, dataset_file) + volumn.commit() + dataset = load_dataset("parquet", data_files=dataset_file) + dataset.push_to_hub(dataset_name, token=os.environ["HUGGINGFACE_TOKEN"]) + except Exception as e: + print(e) return resp @stub.local_entrypoint() def main(): - for scale, batch_size in product([0.25], [512 * 50]): - with open("benchmarks.json", "a") as f: - benchmark = embed_dataset.remote(down_scale=scale, batch_size=batch_size) - print(json.dumps(benchmark, indent=2)) - f.write(json.dumps(benchmark, indent=2) + "\n") + scale = .25 + batch_size = 512 * 150 + with open("benchmarks.json", "a") as f: + benchmark = embed_dataset.remote(down_scale=scale, batch_size=batch_size) + f.write(json.dumps(benchmark, indent=2) + "\n") From e2efe65e4a748e54bc09b5a6cb7de3bf059012b5 Mon Sep 17 00:00:00 2001 From: Jason Liu Date: Wed, 20 Dec 2023 21:14:46 -0500 Subject: [PATCH 2/7] try hf_transfer --- applications/wikipedia/main.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/applications/wikipedia/main.py b/applications/wikipedia/main.py index 147ce69..98e7e50 100644 --- a/applications/wikipedia/main.py +++ b/applications/wikipedia/main.py @@ -145,7 +145,9 @@ async def embed(self, chunks): @stub.function( - image=Image.debian_slim().pip_install("datasets", "pyarrow", "tqdm", "hf_transfer"), + image=Image.debian_slim().pip_install( + "datasets", "pyarrow", "tqdm", "hf_transfer", "huggingface_hub" + ), volumes={cache_dir: volume}, timeout=84600, secret=Secret.from_name("huggingface-credentials"), @@ -229,9 +231,20 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50): names=["id", "url", "title", "text", "embedding"], ) pq.write_table(table, dataset_file) - volumn.commit() - dataset = load_dataset("parquet", data_files=dataset_file) - dataset.push_to_hub(dataset_name, token=os.environ["HUGGINGFACE_TOKEN"]) + + print(f"Uploading to hub {dataset_name}") + from huggingface_hub import HfApi, logging + + logging.set_verbosity_debug() + hf = HfApi() + # ! This is not working but should be + hf.upload_file( + path_or_fileobj=dataset_file, + path_in_repo=dataset_file, + repo_id="jxnlco/modal-wikipedia", + repo_type="dataset", + ) + except Exception as e: print(e) @@ -240,7 +253,7 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50): @stub.local_entrypoint() def main(): - scale = .25 + scale = 0.01 batch_size = 512 * 150 with open("benchmarks.json", "a") as f: benchmark = embed_dataset.remote(down_scale=scale, batch_size=batch_size) From 11b391fb3db90fbd3f9938cccdcc649cc00ed209 Mon Sep 17 00:00:00 2001 From: Jason Liu Date: Thu, 21 Dec 2023 09:05:16 -0500 Subject: [PATCH 3/7] update --- applications/wikipedia/main.py | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/applications/wikipedia/main.py b/applications/wikipedia/main.py index 98e7e50..d9817de 100644 --- a/applications/wikipedia/main.py +++ b/applications/wikipedia/main.py @@ -149,6 +149,7 @@ async def embed(self, chunks): "datasets", "pyarrow", "tqdm", "hf_transfer", "huggingface_hub" ), volumes={cache_dir: volume}, + _allow_background_volume_commits=True, timeout=84600, secret=Secret.from_name("huggingface-credentials"), ) @@ -231,20 +232,8 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50): names=["id", "url", "title", "text", "embedding"], ) pq.write_table(table, dataset_file) - - print(f"Uploading to hub {dataset_name}") - from huggingface_hub import HfApi, logging - - logging.set_verbosity_debug() - hf = HfApi() - # ! This is not working but should be - hf.upload_file( - path_or_fileobj=dataset_file, - path_in_repo=dataset_file, - repo_id="jxnlco/modal-wikipedia", - repo_type="dataset", - ) - + # This is now saved to the volume, so we can just push the volume. + # and upload the dataset to the hub in a separate step. except Exception as e: print(e) @@ -253,7 +242,7 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50): @stub.local_entrypoint() def main(): - scale = 0.01 + scale = 0.10 batch_size = 512 * 150 with open("benchmarks.json", "a") as f: benchmark = embed_dataset.remote(down_scale=scale, batch_size=batch_size) From d3e6ce481221b026d74ff1b3eb42c042fb0adcc6 Mon Sep 17 00:00:00 2001 From: Jason Liu Date: Thu, 21 Dec 2023 09:05:32 -0500 Subject: [PATCH 4/7] benchmarks --- applications/wikipedia/benchmarks.json | 72 ++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/applications/wikipedia/benchmarks.json b/applications/wikipedia/benchmarks.json index 7b73796..a36e73b 100644 --- a/applications/wikipedia/benchmarks.json +++ b/applications/wikipedia/benchmarks.json @@ -511,3 +511,75 @@ "characters_per_sec": 23222100, "extrapolated_duration": "0:14:02.324293" } +{ + "downscale": 0.01, + "batch_size": 76800, + "n_gpu": 100, + "duration_mins": 1.0392728034499998, + "characters_per_sec": 14384533, + "extrapolated_duration": "0:22:39.831352" +} +{ + "downscale": 0.01, + "batch_size": 76800, + "n_gpu": 100, + "duration_mins": 0.7878980491166666, + "characters_per_sec": 18973843, + "extrapolated_duration": "0:17:10.921303" +} +{ + "downscale": 0.01, + "batch_size": 76800, + "n_gpu": 100, + "duration_mins": 0.8039170896166667, + "characters_per_sec": 18595766, + "extrapolated_duration": "0:17:31.881324" +} +{ + "downscale": 0.001, + "batch_size": 76800, + "n_gpu": 100, + "duration_mins": 0.6121226231833333, + "characters_per_sec": 3817226, + "extrapolated_duration": "1:25:24.281077" +} +{ + "downscale": 0.001, + "batch_size": 76800, + "n_gpu": 100, + "duration_mins": 0.584016127, + "characters_per_sec": 4000934, + "extrapolated_duration": "1:21:28.993159" +} +{ + "downscale": 0.001, + "batch_size": 76800, + "n_gpu": 100, + "duration_mins": 0.5205063897, + "characters_per_sec": 4489110, + "extrapolated_duration": "1:12:37.331176" +} +{ + "downscale": 0.001, + "batch_size": 76800, + "n_gpu": 100, + "duration_mins": 0.5561301178666667, + "characters_per_sec": 4201553, + "extrapolated_duration": "1:17:35.549735" +} +{ + "downscale": 0.001, + "batch_size": 76800, + "n_gpu": 100, + "duration_mins": 0.5417734046666667, + "characters_per_sec": 4312892, + "extrapolated_duration": "1:15:35.364891" +} +{ + "downscale": 1, + "batch_size": 76800, + "n_gpu": 100, + "duration_mins": 14.754031291433334, + "characters_per_sec": 21734383, + "extrapolated_duration": "0:14:59.981332" +} From e58ad4c71bb7fe387d4a8f012dd37b09783edabd Mon Sep 17 00:00:00 2001 From: Jason Liu Date: Thu, 21 Dec 2023 10:00:39 -0500 Subject: [PATCH 5/7] split uploads --- applications/wikipedia/main.py | 3 +- applications/wikipedia/upload.py | 47 ++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 applications/wikipedia/upload.py diff --git a/applications/wikipedia/main.py b/applications/wikipedia/main.py index d9817de..5fb29f4 100644 --- a/applications/wikipedia/main.py +++ b/applications/wikipedia/main.py @@ -231,9 +231,10 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50): ], names=["id", "url", "title", "text", "embedding"], ) - pq.write_table(table, dataset_file) + pq.write_table(table, f"{cache_dir}/{dataset_file}") # This is now saved to the volume, so we can just push the volume. # and upload the dataset to the hub in a separate step. + except Exception as e: print(e) diff --git a/applications/wikipedia/upload.py b/applications/wikipedia/upload.py new file mode 100644 index 0000000..647fb62 --- /dev/null +++ b/applications/wikipedia/upload.py @@ -0,0 +1,47 @@ +from pathlib import Path + +from modal import Image, Stub, Volume, Secret + +MODEL_ID = "BAAI/bge-small-en-v1.5" +MODEL_SLUG = MODEL_ID.split("/")[-1] + +BATCH_SIZE = 512 +DOCKER_IMAGE = ( + "ghcr.io/huggingface/text-embeddings-inference:86-0.4.0" # Ampere 86 for A10s. + # "ghcr.io/huggingface/text-embeddings-inference:0.4.0" # Ampere 80 for A100s. + # "ghcr.io/huggingface/text-embeddings-inference:0.3.0" # Turing for T4s. +) +dataset_name = "wikipedia" +volume = Volume.persisted("embedding-wikipedia") +cache_dir = "/data" +data_dir = f"{cache_dir}/{dataset_name}" +DATA_PATH = Path(data_dir) + +dataset_name = f"567-labs/wikipedia-embedding-{MODEL_SLUG}-sample" +dataset_file = "wiki-embeddings.parquet" + + +stub = Stub("embeddings") + + +@stub.function( + image=Image.debian_slim().pip_install( + "datasets", "pyarrow", "tqdm", "hf_transfer", "huggingface_hub" + ), + volumes={cache_dir: volume}, + _allow_background_volume_commits=True, + timeout=84600, + secret=Secret.from_name("huggingface-credentials"), +) +def upload_dataset(): + from datasets import load_dataset + import os + + print(f"Pushing to hub {dataset_name}") + dataset = load_dataset("parquet", data_files=dataset_file) + dataset.push_to_hub(dataset_name, token=os.environ["HUGGINGFACE_TOKEN"]) + + +@stub.local_entrypoint() +def main(): + upload_dataset.remote() From 33a6ec1f19605ca71bb8ad1d40d4ecde39e1d1dc Mon Sep 17 00:00:00 2001 From: Jason Liu Date: Thu, 21 Dec 2023 10:04:42 -0500 Subject: [PATCH 6/7] reorg embedding function --- applications/wikipedia/main.py | 35 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/applications/wikipedia/main.py b/applications/wikipedia/main.py index 5fb29f4..b8e72f9 100644 --- a/applications/wikipedia/main.py +++ b/applications/wikipedia/main.py @@ -24,7 +24,7 @@ data_dir = f"{cache_dir}/{dataset_name}" DATA_PATH = Path(data_dir) -PUSH_TO_HUB = True +SAVE_TO_DISK = True dataset_name = f"567-labs/wikipedia-embedding-{MODEL_SLUG}-sample" dataset_file = "wiki-embeddings.parquet" @@ -218,25 +218,20 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50): print(json.dumps(resp, indent=2)) - if PUSH_TO_HUB: - try: - print(f"Pushing to hub {dataset_name}") - table = pa.Table.from_arrays( - [ - pa.array([chunk[0] for chunk in acc_chunks]), # id - pa.array([chunk[1] for chunk in acc_chunks]), # url - pa.array([chunk[2] for chunk in acc_chunks]), # title - pa.array([chunk[3] for chunk in acc_chunks]), # text - pa.array(embeddings), - ], - names=["id", "url", "title", "text", "embedding"], - ) - pq.write_table(table, f"{cache_dir}/{dataset_file}") - # This is now saved to the volume, so we can just push the volume. - # and upload the dataset to the hub in a separate step. - - except Exception as e: - print(e) + if SAVE_TO_DISK: + print(f"Creating parquet table...") + table = pa.Table.from_arrays( + [ + pa.array([chunk[0] for chunk in acc_chunks]), # id + pa.array([chunk[1] for chunk in acc_chunks]), # url + pa.array([chunk[2] for chunk in acc_chunks]), # title + pa.array([chunk[3] for chunk in acc_chunks]), # text + pa.array(embeddings), + ], + names=["id", "url", "title", "text", "embedding"], + ) + print(f"Saving to disk at {cache_dir}/{dataset_file}") + pq.write_table(table, f"{cache_dir}/{dataset_file}") return resp From e0ce69e598164e17499d7b9441bbb4e047eeba35 Mon Sep 17 00:00:00 2001 From: Jason Liu Date: Thu, 21 Dec 2023 10:24:08 -0500 Subject: [PATCH 7/7] still does not work --- applications/wikipedia/main.py | 3 ++- applications/wikipedia/upload.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/applications/wikipedia/main.py b/applications/wikipedia/main.py index b8e72f9..d93debf 100644 --- a/applications/wikipedia/main.py +++ b/applications/wikipedia/main.py @@ -232,13 +232,14 @@ def embed_dataset(down_scale: float = 0.005, batch_size: int = 512 * 50): ) print(f"Saving to disk at {cache_dir}/{dataset_file}") pq.write_table(table, f"{cache_dir}/{dataset_file}") + volume.commit() return resp @stub.local_entrypoint() def main(): - scale = 0.10 + scale = 0.01 batch_size = 512 * 150 with open("benchmarks.json", "a") as f: benchmark = embed_dataset.remote(down_scale=scale, batch_size=batch_size) diff --git a/applications/wikipedia/upload.py b/applications/wikipedia/upload.py index 647fb62..71a27c6 100644 --- a/applications/wikipedia/upload.py +++ b/applications/wikipedia/upload.py @@ -38,7 +38,7 @@ def upload_dataset(): import os print(f"Pushing to hub {dataset_name}") - dataset = load_dataset("parquet", data_files=dataset_file) + dataset = load_dataset("parquet", data_files=f"{cache_dir}/{dataset_file}") dataset.push_to_hub(dataset_name, token=os.environ["HUGGINGFACE_TOKEN"])