diff --git a/courses/orchestrator_course/Modal/README.md b/courses/orchestrator_course/Modal/README.md new file mode 100644 index 0000000..1553698 --- /dev/null +++ b/courses/orchestrator_course/Modal/README.md @@ -0,0 +1,58 @@ +Sign Up or Login to modal.com + +Download and configure the Python client +Run this in order to install the Python library locally: + +``` +pip install modal +python3 -m modal setup +``` + +The first command will install the Modal client library on your computer, along with its dependencies. + +The second command creates an API token by authenticating through your web browser. It will open a new tab, but you can close it when you are done. + +follow the instructions in quick Start + +Add dlt and github source to requirements: https://modal.com/docs/examples/webscraper#add-dependencies + +```py +dlt_image = modal.Image.debian_slim(python_version="3.12").run_commands( + "apt-get update", + 'pip install "dlt[bigquery]"', +).add_local_python_source("github_pipeline") +``` + + +### Credentials + +Secrets are attached directly to functions: +```py +@app.function( + image=dlt_image, + secrets=[modal.Secret.from_name("github-api")] +) +def run_pipeline(resource): + ... +``` + +### Run locally +```shell +modal run github_pipeline_modal.py +``` + +### Backfilling +```shell +modal run github_pipeline_modal_backfill.py --start-date '2025-08-01 05:47:07+00:00' --end-date '2025-09-01 05:47:07+00:00' +``` + +### Deploy +```shell +modal deploy --name github_scheduled github_pipeline_modal.py +``` + +### Deploy in parallel + +```shell +modal deploy --name github_scheduled_parallel github_pipeline_modal_parallel.py +``` \ No newline at end of file diff --git a/courses/orchestrator_course/Modal/__init__.py b/courses/orchestrator_course/Modal/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/courses/orchestrator_course/Modal/github_pipeline.py b/courses/orchestrator_course/Modal/github_pipeline.py new file mode 100644 index 0000000..e83648c --- /dev/null +++ b/courses/orchestrator_course/Modal/github_pipeline.py @@ -0,0 +1,82 @@ +import dlt +from dlt.sources.rest_api import RESTAPIConfig, rest_api_source + +config: RESTAPIConfig = { + "client": { + "base_url": "https://api.github.com", + "auth": { + "token": dlt.secrets["sources.access_token"], + }, + "headers": { + "Accept": "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + }, + "paginator": "header_link", + }, + "resources": [ + { + "name": "repos", + "endpoint": {"path": "orgs/dlt-hub/repos"}, + }, + { + "name": "contributors", + "endpoint": { + "path": "repos/dlt-hub/dlt/contributors", + }, + }, + { + "name": "issues", + "endpoint": { + "path": "repos/dlt-hub/dlt/issues", + "params": { + "state": "open", # Only get open issues + "sort": "updated", + "direction": "desc", + "since": "{incremental.start_value}", # For incremental loading + }, + "incremental": { + "cursor_path": "updated_at", + "initial_value": "2025-03-01T00:00:00Z", + }, + }, + }, + { + "name": "forks", + "endpoint": { + "path": "repos/dlt-hub/dlt/forks", + "params": { + "sort": "oldest", # Ensures ascending creation order + "per_page": 100, + }, + "incremental": { + "cursor_path": "created_at", + "initial_value": "2025-07-01T00:00:00Z", + "row_order": "asc", + }, + }, + }, + { + "name": "releases", + "endpoint": { + "path": "repos/dlt-hub/dlt/releases", + }, + }, + ], +} + +github_source = rest_api_source(config) + +# Only run the pipeline if this script is executed directly +if __name__ == "__main__": + print("Starting pipeline setup...") + pipeline = dlt.pipeline( + pipeline_name="github_repos_issues", + destination="duckdb", + dataset_name="github_data", + progress="log" # Add logging as per rule recommendation + ) + + load_info = pipeline.run(github_source) + + print("Pipeline run complete.") + print(pipeline.last_trace) \ No newline at end of file diff --git a/courses/orchestrator_course/Modal/github_pipeline_modal.py b/courses/orchestrator_course/Modal/github_pipeline_modal.py new file mode 100644 index 0000000..40f661c --- /dev/null +++ b/courses/orchestrator_course/Modal/github_pipeline_modal.py @@ -0,0 +1,35 @@ +import modal + +app = modal.App("run-github-pipeline") +dlt_image = modal.Image.debian_slim(python_version="3.12").run_commands( + "apt-get update", + 'pip install "dlt[bigquery]"', +).add_local_python_source("github_pipeline") + +# Define the pokemon source - this will be used by the DAG +@app.function( + image=dlt_image, + secrets=[modal.Secret.from_name("github-api"), modal.Secret.from_name("googlecloud-secret-bigquery")], + schedule=modal.Period(minutes=1), +) +def run_pipeline(): + import dlt + from github_pipeline import github_source + + print("Starting pipeline setup...") + pipeline = dlt.pipeline( + pipeline_name="github_pipeline", + destination="bigquery", + dataset_name="alena_github_data", + ) + print("Pipeline created.") + + load_info = pipeline.run(github_source) + print("Pipeline run complete.") + print("Load info:", load_info) + return load_info + + +@app.local_entrypoint() +def main(): + run_pipeline.remote() diff --git a/courses/orchestrator_course/Modal/github_pipeline_modal_backfill.py b/courses/orchestrator_course/Modal/github_pipeline_modal_backfill.py new file mode 100644 index 0000000..e96e308 --- /dev/null +++ b/courses/orchestrator_course/Modal/github_pipeline_modal_backfill.py @@ -0,0 +1,58 @@ +import modal +from datetime import datetime + +app = modal.App("run-github-pipeline") +dlt_image = modal.Image.debian_slim(python_version="3.12").run_commands( + "apt-get update", + 'pip install "dlt[bigquery]"', +).add_local_python_source("github_pipeline") + +# Define the pokemon source - this will be used by the DAG +@app.function( + image=dlt_image, + secrets=[modal.Secret.from_name("github-api"), modal.Secret.from_name("googlecloud-secret-bigquery")], + schedule=modal.Period(minutes=1), + +) +def run_pipeline(start_date: str| None = None, end_date: str | None = None): + + import dlt + from github_pipeline import github_source + + print("Starting pipeline setup...") + pipeline = dlt.pipeline( + pipeline_name="github_pipeline", + destination="bigquery", + dataset_name="alena_github_data", + ) + print("Pipeline created.") + + if start_date and end_date: + # Backfilling + github_source.forks.apply_hints( + incremental=dlt.sources.incremental( + "created_at", + initial_value=start_date, + end_value=end_date, + row_order="asc" + ) + ) + + load_info = pipeline.run(github_source, write_disposition="replace") + print("Pipeline run complete.") + print("Load info:", load_info) + + dataset = pipeline.dataset() + forks_df = dataset.forks.df() + print(forks_df.loc[:, "created_at"].min(), forks_df.loc[:, "created_at"].max()) + + if start_date and end_date: + assert forks_df.loc[:, "created_at"].min() >= datetime.fromisoformat(start_date), forks_df.loc[:, "created_at"].min() + assert forks_df.loc[:, "created_at"].max() <= datetime.fromisoformat(end_date), forks_df.loc[:, "created_at"].max() + + return load_info + + +@app.local_entrypoint() +def main(start_date: str| None = None, end_date: str | None = None): + run_pipeline.remote(start_date, end_date) diff --git a/courses/orchestrator_course/Modal/github_pipeline_modal_parallel.py b/courses/orchestrator_course/Modal/github_pipeline_modal_parallel.py new file mode 100644 index 0000000..af1f071 --- /dev/null +++ b/courses/orchestrator_course/Modal/github_pipeline_modal_parallel.py @@ -0,0 +1,38 @@ +import modal + +app = modal.App("run-github-pipeline-parallel") +dlt_image = modal.Image.debian_slim(python_version="3.12").run_commands( + "apt-get update", + 'pip install "dlt[bigquery]"', +).add_local_python_source("github_pipeline") + + +# Define the pokemon source - this will be used by the DAG +@app.function( + image=dlt_image, + secrets=[modal.Secret.from_name("github-api"), modal.Secret.from_name("googlecloud-secret-bigquery")], +) +def run_pipeline(resource_name): + import dlt + from github_pipeline import github_source + + print("Starting pipeline setup...") + pipeline = dlt.pipeline( + pipeline_name=f"github_pipeline_{resource_name}", + destination="bigquery", + dataset_name="github_data", + ) + print("Pipeline created.") + + load_info = pipeline.run(github_source.with_resources(resource_name)) + print("Pipeline run complete.") + print(load_info) + return load_info + + +@app.function(image=dlt_image, schedule=modal.Period(minutes=1), secrets=[modal.Secret.from_name("github-api")]) +def main(): + from github_pipeline import github_source + resources = list(github_source.resources) + + run_pipeline.spawn_map(resources)