Skip to content

Deeplake Dataloaders

Deeplake integrates natively with PyTorch and TensorFlow, letting you stream data directly from managed tables into your training loop. No local downloads, no custom data-loading code -- just open a table and start training.

Objective

Learn how to connect a Deeplake managed table to PyTorch DataLoader, TensorFlow tf.data, and simple batch iteration for model training and data processing.

Prerequisites

  • Deeplake SDK: pip install deeplake
  • Deep Learning framework: pip install torch torchvision (or pip install tensorflow).
  • A Deeplake API token.

Set credentials first

export DEEPLAKE_API_KEY="your-token-here"
export DEEPLAKE_WORKSPACE="your-workspace"  # optional, defaults to "default"

Complete Code

import io
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from PIL import Image
from deeplake import Client

# 1. Connect to Deeplake
client = Client()
ds = client.open_table("training_data")

# 2. Create a PyTorch DataLoader with a per-sample transform
# ds.pytorch(transform=fn) passes each sample as a Row object.
# Call sample.to_dict() to get a mutable dict, then modify it.
tform = transforms.Compose([
    transforms.RandomRotation(20),
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
])

def apply_transform(sample):
    d = sample.to_dict()
    d["image"] = tform(Image.open(io.BytesIO(d["image"])).convert("RGB"))
    return d

loader = DataLoader(
    ds.pytorch(transform=apply_transform),
    batch_size=32,
    shuffle=True,
    num_workers=4,
)

# 3. Standard training loop, images arrive as tensors
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for batch in loader:
    images = batch["image"].to(device)
    labels = batch["label"].to(device)
    # training_step(images, labels)

Step-by-Step Breakdown

1. PyTorch DataLoader

The most common pattern. Call ds.pytorch() to get a PyTorch-compatible dataset, then wrap it in a standard DataLoader for batching, shuffling, and parallel prefetching.

from torch.utils.data import DataLoader
from deeplake import Client

client = Client()
ds = client.open_table("training_data")

loader = DataLoader(
    ds.pytorch(),
    batch_size=32,
    shuffle=True,
    num_workers=4,
)

for batch in loader:
    images = batch["image"]   # list of bytes (encoded images)
    labels = batch["label"]   # Tensor: (batch_size,)
    # training_step(images, labels)

2. Applying Transforms

Deeplake streams raw data as bytes for image columns. Use ds.pytorch(transform=fn) to decode and transform each sample before batching. The transform receives a Row object, call .to_dict() to get a mutable dict.

import io
from PIL import Image
from torchvision import transforms

tform = transforms.Compose([
    transforms.RandomRotation(20),
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
])

def apply_transform(sample):
    d = sample.to_dict()
    d["image"] = tform(Image.open(io.BytesIO(d["image"])).convert("RGB"))
    return d

loader = DataLoader(
    ds.pytorch(transform=apply_transform),
    batch_size=32,
    shuffle=True,
    num_workers=4,
)

for batch in loader:
    images = batch["image"].to(device)   # Tensor: (batch_size, C, H, W)
    labels = batch["label"].to(device)
    # training_step(images, labels)

3. TensorFlow Dataset

Call ds.tensorflow() to get a tf.data.Dataset. Chain .batch() and .prefetch() for optimal GPU utilization.

import tensorflow as tf
from deeplake import Client

client = Client()
ds = client.open_table("training_data")

tf_ds = ds.tensorflow().batch(32).prefetch(tf.data.AUTOTUNE)

model.fit(tf_ds, epochs=10)

You can also iterate manually:

for batch in tf_ds:
    images = batch["image"]   # tf.Tensor
    labels = batch["label"]   # tf.Tensor
    # training_step(images, labels)

4. Simple Batch Iteration

For non-training workloads (preprocessing, analysis, inference), use ds.batches() for straightforward iteration without framework dependencies.

ds = client.open_table("large_dataset")

for batch in ds.batches(1000):
    process(batch)

5. Async Data Loading

For datasets with large tensors (images, video, embeddings), sequential row fetching becomes the bottleneck. Use get_async() to fetch multiple columns concurrently with an asyncio worker thread.

import asyncio
import threading
from multiprocessing import Queue
from torch.utils.data import IterableDataset, DataLoader
from deeplake import Client

class AsyncDeepLakeDataset(IterableDataset):
    def __init__(self, ds, concurrency=128):
        self.ds = ds
        self.concurrency = concurrency
        self.queue = Queue(maxsize=concurrency * 2)

    def _worker(self):
        loop = asyncio.new_event_loop()

        async def fetch_all():
            sem = asyncio.Semaphore(self.concurrency)
            async def fetch_one(idx):
                async with sem:
                    row = self.ds[idx]
                    image, label = await asyncio.gather(
                        row.get_async("image"),
                        row.get_async("label"),
                    )
                    self.queue.put((image, label))
            tasks = [fetch_one(i) for i in range(len(self.ds))]
            await asyncio.gather(*tasks)

        loop.run_until_complete(fetch_all())
        self.queue.put(None)  # Sentinel

    def __iter__(self):
        thread = threading.Thread(target=self._worker, daemon=True)
        thread.start()
        while True:
            item = self.queue.get()
            if item is None:
                break
            yield item

client = Client()
ds = client.open_table("training_data")
loader = DataLoader(AsyncDeepLakeDataset(ds), batch_size=32)

for images, labels in loader:
    # training_step(images, labels)
    pass

This pattern fetches up to 128 rows concurrently in a background thread, yielding them through a queue. On image-heavy datasets, this can be 2-3x faster than sequential loading.

6. Custom PyTorch Dataset Wrapper

For complex preprocessing or multi-table joins, wrap open_table in a custom torch.utils.data.Dataset.

import io
import torch
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from deeplake import Client

class DeepLakeDataset(Dataset):
    def __init__(self, client, table_name, transform=None):
        self.ds = client.open_table(table_name)
        self.transform = transform
        self.length = len(self.ds)

    def __len__(self):
        return self.length

    def __getitem__(self, idx):
        row = self.ds[idx]
        # Image columns return raw bytes, decode to PIL
        image = Image.open(io.BytesIO(row["image"])).convert("RGB")
        label = row["label"]

        if self.transform:
            image = self.transform(image)

        return image, label

# Usage
client = Client()

dataset = DeepLakeDataset(
    client,
    "training_data",
    transform=transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ]),
)

loader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)

for images, labels in loader:
    # training_step(images, labels)
    pass

Why no REST API?

Streaming high-performance tensor data over standard REST endpoints introduces significant latency and CPU overhead due to HTTP headers and JSON serialization. For high-throughput training, the Python SDK is the only supported method as it uses optimized C++ streaming kernels.

Performance Tips

Parameter Recommendation
num_workers Start with the number of CPU cores. For I/O-bound workloads (cloud streaming), use 2-4x CPU cores.
prefetch_factor Default is 2. Increase to 4-8 if your GPU is starved for data.
batch_size Maximize GPU memory usage. Larger batches reduce per-sample overhead.
shuffle Use True for training. Disable for inference to preserve order.
pin_memory Set True when training on GPU to speed up host-to-device transfer.

Streaming vs. local: Deeplake streams data from cloud storage by default. For repeated epochs on smaller datasets, consider caching locally to avoid redundant network I/O.

Example with all optimizations:

loader = DataLoader(
    ds.pytorch(),
    batch_size=64,
    shuffle=True,
    num_workers=8,
    prefetch_factor=4,
    pin_memory=True,
    persistent_workers=True,
)

Parallel JPEG decoding: When streaming image datasets, JPEG decode on CPU is the bottleneck (not network or GPU). Use ThreadPoolExecutor to decode in parallel for a ~1.8x speedup:

from concurrent.futures import ThreadPoolExecutor

decode_pool = ThreadPoolExecutor(max_workers=8)

def decode_jpeg(img_bytes):
    return Image.open(io.BytesIO(img_bytes)).convert("RGB")

for batch in ds.batches(batch_size=128):
    images = list(decode_pool.map(decode_jpeg, batch["image"]))
    # images is now a list of PIL Images, ready for transforms
Decode method (batch_size=128) rows/s Speedup
PIL single-thread ~250 1x
ThreadPool (8 workers) ~400 1.6x

Benchmark Results

All benchmarks run on NVIDIA A100-SXM4-40GB, streaming the aloha_shrimp dataset (56k rows, 480×640 JPEG images + 14-DOF float arrays) from Deeplake cloud storage. Reproducible with benchmark_dataloaders.py and benchmark_managed_dataloader.py.

Streaming Throughput (ds.batches())

batch_size Rows/s MB/s Gb/s
64 ~2,800 ~70 0.56
128 ~4,500 ~100 1.01
256 ~2,700 ~68 0.54

Streaming + JPEG Decode + GPU Transfer

Method (batch_size=128) Rows/s Bottleneck
PIL single-thread decode → GPU ~250 CPU decode
ThreadPool (8 workers) decode → GPU ~400 CPU decode
Raw stream (no decode) ~4,500 Network

Ingestion Throughput (client.ingest())

Rows Time Rows/s MB/s
1,000 5.5s ~180 ~5
2,000 6.5s ~310 ~9
50,000 36.4s ~1,370 ~38

VLA Fine-tuning (OpenVLA 7B + LoRA, real ALOHA data)

Metric Value
Training throughput 7.5 rows/s
Loss (first 50 steps) 106.3
Loss (last 50 steps) 7.5
Loss reduction 93%
GPU utilization avg 89%, max 99%
GPU memory 22.5 / 41 GB (55%)

Known Limitations

Issue Impact
DataLoader(shuffle=True, num_workers=0) ~2 rows/s - per-row network fetch
DataLoader(shuffle=True, num_workers>0) Hangs - C++ fork issue
IterableDataset + shuffle=True ValueError - not supported by PyTorch
ds.batches() shuffle Not supported - shuffle in memory or via TQL

What to try next