Skip to content

Apache Airflow

Apache Airflow is the standard open-source platform for authoring, scheduling, and monitoring data pipelines. DAGs (Directed Acyclic Graphs) define workflows as Python code, giving you full control over task dependencies, retries, and scheduling.

Deeplake + Airflow lets you build scheduled data pipelines that ingest, transform, and query multimodal datasets: images, videos, embeddings, PDFs, without writing custom orchestration logic.

Objective

Build an Airflow DAG that runs on a daily schedule to:

  1. Fetch the latest news articles from an RSS feed
  2. Generate embeddings with OpenAI
  3. Ingest titles, URLs, and embeddings into a Deeplake table
  4. Verify the data landed and is searchable

Architecture

┌─────────────────────────────────────────────┐
│  Airflow Scheduler (runs DAG daily)         │
└──────────────┬──────────────────────────────┘
    ┌──────────▼──────────────────────────┐
    │  Task 1: fetch_news                 │
    │  Parse RSS feed for new articles    │
    └──────────┬──────────────────────────┘
               │  XCom (titles + URLs)
    ┌──────────▼──────────────────────────┐
    │  Task 2: embed_articles             │
    │  OpenAI text-embedding-3-small      │
    └──────────┬──────────────────────────┘
               │  XCom (+ embeddings)
    ┌──────────▼──────────────────────────┐
    │  Task 3: ingest_to_deeplake         │
    │  client.ingest() with index         │
    └──────────┬──────────────────────────┘
    ┌──────────▼──────────────────────────┐
    │  Task 4: verify_ingestion           │
    │  client.open_table() + len()        │
    └─────────────────────────────────────┘
        Deeplake Cloud
        (searchable vector store)

Prerequisites

  • Apache Airflow: Installation guide
  • Deeplake SDK: pip install deeplake
  • A Deeplake API token here
  • An OpenAI API key (for embeddings)
  • pip install feedparser openai

Set credentials in Airflow

Store your tokens as environment variables so they're available to all tasks:

export DEEPLAKE_API_KEY="your-deeplake-token"
export OPENAI_API_KEY="your-openai-key"

Complete DAG

from datetime import datetime, timedelta
from airflow.decorators import dag, task

TABLE_NAME = "news_articles"


@dag(
    dag_id="deeplake_news_pipeline",
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["deeplake", "embeddings"],
)
def deeplake_news_pipeline():
    """Fetch news, embed, and ingest into Deeplake daily."""

    @task
    def fetch_news() -> dict:
        """Fetch latest articles from Hacker News RSS."""
        import feedparser

        feed = feedparser.parse("https://hnrss.org/newest?count=20")

        titles = [entry.title for entry in feed.entries]
        urls = [entry.link for entry in feed.entries]

        print(f"Fetched {len(titles)} articles.")
        return {"titles": titles, "urls": urls}

    @task(retries=3, retry_delay=timedelta(seconds=30))
    def embed_articles(data: dict) -> dict:
        """Generate embeddings for article titles."""
        import openai

        client = openai.OpenAI()
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=data["titles"],
        )
        embeddings = [item.embedding for item in response.data]

        print(f"Generated {len(embeddings)} embeddings.")
        return {
            "titles": data["titles"],
            "urls": data["urls"],
            "embeddings": embeddings,
        }

    @task(retries=3, retry_delay=timedelta(seconds=30))
    def ingest_to_deeplake(data: dict) -> int:
        """Ingest articles with embeddings into Deeplake."""
        from deeplake import Client

        client = Client()

        result = client.ingest(
            table_name=TABLE_NAME,
            data={
                "title": data["titles"],
                "url": data["urls"],
                "embedding": data["embeddings"],
            },
            schema={
                "title": "text",
                "url": "text",
            },
            index=["embedding"],
        )

        row_count = result["row_count"]
        print(f"Ingested {row_count} articles into {TABLE_NAME}.")
        return row_count

    @task
    def verify_ingestion(row_count: int):
        """Verify data landed using open_table."""
        from deeplake import Client

        client = Client()
        table = client.open_table(TABLE_NAME)
        total = len(table)
        print(f"Table '{TABLE_NAME}' has {total} total rows (added {row_count}).")

        if row_count == 0:
            raise ValueError("No rows were ingested — check the RSS feed.")

    # Task dependencies
    news = fetch_news()
    embedded = embed_articles(news)
    count = ingest_to_deeplake(embedded)
    verify_ingestion(count)


deeplake_news_pipeline()

Save this file as dags/deeplake_news_pipeline.py in your Airflow DAGs folder.

Running the DAG

# Test a single run locally
airflow dags test deeplake_news_pipeline 2026-01-01

# Or trigger via CLI
airflow dags trigger deeplake_news_pipeline

Querying the Data

Once the DAG has run, you can search your news articles by meaning:

from deeplake import Client
import openai

client = Client()
oai = openai.OpenAI()

# Embed your search query
query_embedding = oai.embeddings.create(
    model="text-embedding-3-small",
    input=["machine learning breakthroughs"],
).data[0].embedding

# Semantic search: find the most relevant articles
results = client.query(
    "SELECT title, url FROM news_articles ORDER BY embedding <#> $1 LIMIT 5",
    (query_embedding,),
)

for row in results:
    print(f"{row['title']}")
    print(f"  {row['url']}")

Tips

Backfill historical data

Set catchup=True and use templated dates to backfill day-by-day:

@dag(schedule="@daily", catchup=True, start_date=datetime(2026, 1, 1))
def my_dag():

    @task
    def fetch_news(**context):
        execution_date = context["ds"]  # e.g., "2026-03-15"
        # Fetch only articles from this date
        feed = feedparser.parse(
            f"https://hnrss.org/newest?count=20&q=after:{execution_date}"
        )
        ...

Retry on transient failures

The embed_articles and ingest_to_deeplake tasks have retries=3 configured. Airflow will automatically retry on network timeouts or API rate limits:

@task(retries=3, retry_delay=timedelta(seconds=30))
def embed_articles(data: dict) -> dict:
    ...

Swap the embedding model

Replace OpenAI with any embedding provider. The only requirement is that embedding is a list of float lists:

# Sentence Transformers (local, free)
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(titles).tolist()

# Cohere
import cohere
co = cohere.Client()
response = co.embed(texts=titles, model="embed-english-v3.0")
embeddings = response.embeddings

What to try next