Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions courses/orchestrator_course/Modal/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
Sign Up or Login to modal.com

Download and configure the Python client
Run this in order to install the Python library locally:

```
pip install modal
python3 -m modal setup
```

The first command will install the Modal client library on your computer, along with its dependencies.

The second command creates an API token by authenticating through your web browser. It will open a new tab, but you can close it when you are done.

follow the instructions in quick Start

Add dlt and github source to requirements: https://modal.com/docs/examples/webscraper#add-dependencies

```py
dlt_image = modal.Image.debian_slim(python_version="3.12").run_commands(
"apt-get update",
'pip install "dlt[bigquery]"',
).add_local_python_source("github_pipeline")
```


### Credentials

Secrets are attached directly to functions:
```py
@app.function(
image=dlt_image,
secrets=[modal.Secret.from_name("github-api")]
)
def run_pipeline(resource):
...
```

### Run locally
```shell
modal run github_pipeline_modal.py
```

### Backfilling
```shell
modal run github_pipeline_modal_backfill.py --start-date '2025-08-01 05:47:07+00:00' --end-date '2025-09-01 05:47:07+00:00'
```

### Deploy
```shell
modal deploy --name github_scheduled github_pipeline_modal.py
```

### Deploy in parallel

```shell
modal deploy --name github_scheduled_parallel github_pipeline_modal_parallel.py
```
Empty file.
82 changes: 82 additions & 0 deletions courses/orchestrator_course/Modal/github_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import dlt
from dlt.sources.rest_api import RESTAPIConfig, rest_api_source

config: RESTAPIConfig = {
"client": {
"base_url": "https://api.github.com",
"auth": {
"token": dlt.secrets["sources.access_token"],
},
"headers": {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
},
"paginator": "header_link",
},
"resources": [
{
"name": "repos",
"endpoint": {"path": "orgs/dlt-hub/repos"},
},
{
"name": "contributors",
"endpoint": {
"path": "repos/dlt-hub/dlt/contributors",
},
},
{
"name": "issues",
"endpoint": {
"path": "repos/dlt-hub/dlt/issues",
"params": {
"state": "open", # Only get open issues
"sort": "updated",
"direction": "desc",
"since": "{incremental.start_value}", # For incremental loading
},
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2025-03-01T00:00:00Z",
},
},
},
{
"name": "forks",
"endpoint": {
"path": "repos/dlt-hub/dlt/forks",
"params": {
"sort": "oldest", # Ensures ascending creation order
"per_page": 100,
},
"incremental": {
"cursor_path": "created_at",
"initial_value": "2025-07-01T00:00:00Z",
"row_order": "asc",
},
},
},
{
"name": "releases",
"endpoint": {
"path": "repos/dlt-hub/dlt/releases",
},
},
],
}

github_source = rest_api_source(config)

# Only run the pipeline if this script is executed directly
if __name__ == "__main__":
print("Starting pipeline setup...")
pipeline = dlt.pipeline(
pipeline_name="github_repos_issues",
destination="duckdb",
dataset_name="github_data",
progress="log" # Add logging as per rule recommendation
)

load_info = pipeline.run(github_source)

print("Pipeline run complete.")
print(pipeline.last_trace)
35 changes: 35 additions & 0 deletions courses/orchestrator_course/Modal/github_pipeline_modal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import modal

app = modal.App("run-github-pipeline")
dlt_image = modal.Image.debian_slim(python_version="3.12").run_commands(
"apt-get update",
'pip install "dlt[bigquery]"',
).add_local_python_source("github_pipeline")

# Define the pokemon source - this will be used by the DAG
@app.function(
image=dlt_image,
secrets=[modal.Secret.from_name("github-api"), modal.Secret.from_name("googlecloud-secret-bigquery")],
schedule=modal.Period(minutes=1),
)
def run_pipeline():
import dlt
from github_pipeline import github_source

print("Starting pipeline setup...")
pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="bigquery",
dataset_name="alena_github_data",
)
print("Pipeline created.")

load_info = pipeline.run(github_source)
print("Pipeline run complete.")
print("Load info:", load_info)
return load_info


@app.local_entrypoint()
def main():
run_pipeline.remote()
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import modal
from datetime import datetime

app = modal.App("run-github-pipeline")
dlt_image = modal.Image.debian_slim(python_version="3.12").run_commands(
"apt-get update",
'pip install "dlt[bigquery]"',
).add_local_python_source("github_pipeline")

# Define the pokemon source - this will be used by the DAG
@app.function(
image=dlt_image,
secrets=[modal.Secret.from_name("github-api"), modal.Secret.from_name("googlecloud-secret-bigquery")],
schedule=modal.Period(minutes=1),

)
def run_pipeline(start_date: str| None = None, end_date: str | None = None):

import dlt
from github_pipeline import github_source

print("Starting pipeline setup...")
pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="bigquery",
dataset_name="alena_github_data",
)
print("Pipeline created.")

if start_date and end_date:
# Backfilling
github_source.forks.apply_hints(
incremental=dlt.sources.incremental(
"created_at",
initial_value=start_date,
end_value=end_date,
row_order="asc"
)
)

load_info = pipeline.run(github_source, write_disposition="replace")
print("Pipeline run complete.")
print("Load info:", load_info)

dataset = pipeline.dataset()
forks_df = dataset.forks.df()
print(forks_df.loc[:, "created_at"].min(), forks_df.loc[:, "created_at"].max())

if start_date and end_date:
assert forks_df.loc[:, "created_at"].min() >= datetime.fromisoformat(start_date), forks_df.loc[:, "created_at"].min()
assert forks_df.loc[:, "created_at"].max() <= datetime.fromisoformat(end_date), forks_df.loc[:, "created_at"].max()

return load_info


@app.local_entrypoint()
def main(start_date: str| None = None, end_date: str | None = None):
run_pipeline.remote(start_date, end_date)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import modal

app = modal.App("run-github-pipeline-parallel")
dlt_image = modal.Image.debian_slim(python_version="3.12").run_commands(
"apt-get update",
'pip install "dlt[bigquery]"',
).add_local_python_source("github_pipeline")


# Define the pokemon source - this will be used by the DAG
@app.function(
image=dlt_image,
secrets=[modal.Secret.from_name("github-api"), modal.Secret.from_name("googlecloud-secret-bigquery")],
)
def run_pipeline(resource_name):
import dlt
from github_pipeline import github_source

print("Starting pipeline setup...")
pipeline = dlt.pipeline(
pipeline_name=f"github_pipeline_{resource_name}",
destination="bigquery",
dataset_name="github_data",
)
print("Pipeline created.")

load_info = pipeline.run(github_source.with_resources(resource_name))
print("Pipeline run complete.")
print(load_info)
return load_info


@app.function(image=dlt_image, schedule=modal.Period(minutes=1), secrets=[modal.Secret.from_name("github-api")])
def main():
from github_pipeline import github_source
resources = list(github_source.resources)

run_pipeline.spawn_map(resources)