Apache Airflow¶
Apache Airflow is the standard open-source platform for authoring, scheduling, and monitoring data pipelines. DAGs (Directed Acyclic Graphs) define workflows as Python code, giving you full control over task dependencies, retries, and scheduling.
Deeplake + Airflow lets you build scheduled data pipelines that ingest, transform, and query multimodal datasets: images, videos, embeddings, PDFs, without writing custom orchestration logic.
Objective¶
Build an Airflow DAG that runs on a daily schedule to:
- Fetch the latest news articles from an RSS feed
- Generate embeddings with OpenAI
- Ingest titles, URLs, and embeddings into a Deeplake table
- Verify the data landed and is searchable
Architecture¶
┌─────────────────────────────────────────────┐
│ Airflow Scheduler (runs DAG daily) │
└──────────────┬──────────────────────────────┘
│
┌──────────▼──────────────────────────┐
│ Task 1: fetch_news │
│ Parse RSS feed for new articles │
└──────────┬──────────────────────────┘
│ XCom (titles + URLs)
┌──────────▼──────────────────────────┐
│ Task 2: embed_articles │
│ OpenAI text-embedding-3-small │
└──────────┬──────────────────────────┘
│ XCom (+ embeddings)
┌──────────▼──────────────────────────┐
│ Task 3: ingest_to_deeplake │
│ client.ingest() with index │
└──────────┬──────────────────────────┘
│
┌──────────▼──────────────────────────┐
│ Task 4: verify_ingestion │
│ client.open_table() + len() │
└─────────────────────────────────────┘
│
Deeplake Cloud
(searchable vector store)
Prerequisites¶
- Apache Airflow: Installation guide
- Deeplake SDK:
pip install deeplake - A Deeplake API token here
- An OpenAI API key (for embeddings)
pip install feedparser openai
Set credentials in Airflow
Store your tokens as environment variables so they're available to all tasks:
Complete DAG¶
from datetime import datetime, timedelta
from airflow.decorators import dag, task
TABLE_NAME = "news_articles"
@dag(
dag_id="deeplake_news_pipeline",
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["deeplake", "embeddings"],
)
def deeplake_news_pipeline():
"""Fetch news, embed, and ingest into Deeplake daily."""
@task
def fetch_news() -> dict:
"""Fetch latest articles from Hacker News RSS."""
import feedparser
feed = feedparser.parse("https://hnrss.org/newest?count=20")
titles = [entry.title for entry in feed.entries]
urls = [entry.link for entry in feed.entries]
print(f"Fetched {len(titles)} articles.")
return {"titles": titles, "urls": urls}
@task(retries=3, retry_delay=timedelta(seconds=30))
def embed_articles(data: dict) -> dict:
"""Generate embeddings for article titles."""
import openai
client = openai.OpenAI()
response = client.embeddings.create(
model="text-embedding-3-small",
input=data["titles"],
)
embeddings = [item.embedding for item in response.data]
print(f"Generated {len(embeddings)} embeddings.")
return {
"titles": data["titles"],
"urls": data["urls"],
"embeddings": embeddings,
}
@task(retries=3, retry_delay=timedelta(seconds=30))
def ingest_to_deeplake(data: dict) -> int:
"""Ingest articles with embeddings into Deeplake."""
from deeplake import Client
client = Client()
result = client.ingest(
table_name=TABLE_NAME,
data={
"title": data["titles"],
"url": data["urls"],
"embedding": data["embeddings"],
},
schema={
"title": "text",
"url": "text",
},
index=["embedding"],
)
row_count = result["row_count"]
print(f"Ingested {row_count} articles into {TABLE_NAME}.")
return row_count
@task
def verify_ingestion(row_count: int):
"""Verify data landed using open_table."""
from deeplake import Client
client = Client()
table = client.open_table(TABLE_NAME)
total = len(table)
print(f"Table '{TABLE_NAME}' has {total} total rows (added {row_count}).")
if row_count == 0:
raise ValueError("No rows were ingested — check the RSS feed.")
# Task dependencies
news = fetch_news()
embedded = embed_articles(news)
count = ingest_to_deeplake(embedded)
verify_ingestion(count)
deeplake_news_pipeline()
Save this file as dags/deeplake_news_pipeline.py in your Airflow DAGs folder.
Running the DAG¶
# Test a single run locally
airflow dags test deeplake_news_pipeline 2026-01-01
# Or trigger via CLI
airflow dags trigger deeplake_news_pipeline
Querying the Data¶
Once the DAG has run, you can search your news articles by meaning:
from deeplake import Client
import openai
client = Client()
oai = openai.OpenAI()
# Embed your search query
query_embedding = oai.embeddings.create(
model="text-embedding-3-small",
input=["machine learning breakthroughs"],
).data[0].embedding
# Semantic search: find the most relevant articles
results = client.query(
"SELECT title, url FROM news_articles ORDER BY embedding <#> $1 LIMIT 5",
(query_embedding,),
)
for row in results:
print(f"{row['title']}")
print(f" {row['url']}")
Tips¶
Backfill historical data
Set catchup=True and use templated dates to backfill day-by-day:
Retry on transient failures
The embed_articles and ingest_to_deeplake tasks have retries=3 configured. Airflow will automatically retry on network timeouts or API rate limits:
Swap the embedding model
Replace OpenAI with any embedding provider. The only requirement is that embedding is a list of float lists:
# Sentence Transformers (local, free)
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(titles).tolist()
# Cohere
import cohere
co = cohere.Client()
response = co.embed(texts=titles, model="embed-english-v3.0")
embeddings = response.embeddings
What to try next¶
- Massive Data Ingestion: tune ingestion for high-throughput pipelines
- Retrieval to Training: build end-to-end ML pipelines
- Hybrid RAG: combine vector and full-text search
- Semantic Search: advanced similarity search patterns