Deeplake Dataloaders¶
Deeplake integrates natively with PyTorch and TensorFlow, letting you stream data directly from managed tables into your training loop. No local downloads, no custom data-loading code -- just open a table and start training.
Objective¶
Learn how to connect a Deeplake managed table to PyTorch DataLoader, TensorFlow tf.data, and simple batch iteration for model training and data processing.
Prerequisites¶
- Deeplake SDK:
pip install deeplake - Deep Learning framework:
pip install torch torchvision(orpip install tensorflow). - A Deeplake API token.
Set credentials first
Complete Code¶
import io
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from PIL import Image
from deeplake import Client
# 1. Connect to Deeplake
client = Client()
ds = client.open_table("training_data")
# 2. Create a PyTorch DataLoader with a per-sample transform
# ds.pytorch(transform=fn) passes each sample as a Row object.
# Call sample.to_dict() to get a mutable dict, then modify it.
tform = transforms.Compose([
transforms.RandomRotation(20),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
])
def apply_transform(sample):
d = sample.to_dict()
d["image"] = tform(Image.open(io.BytesIO(d["image"])).convert("RGB"))
return d
loader = DataLoader(
ds.pytorch(transform=apply_transform),
batch_size=32,
shuffle=True,
num_workers=4,
)
# 3. Standard training loop, images arrive as tensors
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
for batch in loader:
images = batch["image"].to(device)
labels = batch["label"].to(device)
# training_step(images, labels)
Step-by-Step Breakdown¶
1. PyTorch DataLoader¶
The most common pattern. Call ds.pytorch() to get a PyTorch-compatible dataset, then wrap it in a standard DataLoader for batching, shuffling, and parallel prefetching.
from torch.utils.data import DataLoader
from deeplake import Client
client = Client()
ds = client.open_table("training_data")
loader = DataLoader(
ds.pytorch(),
batch_size=32,
shuffle=True,
num_workers=4,
)
for batch in loader:
images = batch["image"] # list of bytes (encoded images)
labels = batch["label"] # Tensor: (batch_size,)
# training_step(images, labels)
2. Applying Transforms¶
Deeplake streams raw data as bytes for image columns. Use ds.pytorch(transform=fn) to decode and transform each sample before batching. The transform receives a Row object, call .to_dict() to get a mutable dict.
import io
from PIL import Image
from torchvision import transforms
tform = transforms.Compose([
transforms.RandomRotation(20),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
])
def apply_transform(sample):
d = sample.to_dict()
d["image"] = tform(Image.open(io.BytesIO(d["image"])).convert("RGB"))
return d
loader = DataLoader(
ds.pytorch(transform=apply_transform),
batch_size=32,
shuffle=True,
num_workers=4,
)
for batch in loader:
images = batch["image"].to(device) # Tensor: (batch_size, C, H, W)
labels = batch["label"].to(device)
# training_step(images, labels)
3. TensorFlow Dataset¶
Call ds.tensorflow() to get a tf.data.Dataset. Chain .batch() and .prefetch() for optimal GPU utilization.
You can also iterate manually:
4. Simple Batch Iteration¶
For non-training workloads (preprocessing, analysis, inference), use ds.batches() for straightforward iteration without framework dependencies.
5. Async Data Loading¶
For datasets with large tensors (images, video, embeddings), sequential row fetching becomes the bottleneck. Use get_async() to fetch multiple columns concurrently with an asyncio worker thread.
import asyncio
import threading
from multiprocessing import Queue
from torch.utils.data import IterableDataset, DataLoader
from deeplake import Client
class AsyncDeepLakeDataset(IterableDataset):
def __init__(self, ds, concurrency=128):
self.ds = ds
self.concurrency = concurrency
self.queue = Queue(maxsize=concurrency * 2)
def _worker(self):
loop = asyncio.new_event_loop()
async def fetch_all():
sem = asyncio.Semaphore(self.concurrency)
async def fetch_one(idx):
async with sem:
row = self.ds[idx]
image, label = await asyncio.gather(
row.get_async("image"),
row.get_async("label"),
)
self.queue.put((image, label))
tasks = [fetch_one(i) for i in range(len(self.ds))]
await asyncio.gather(*tasks)
loop.run_until_complete(fetch_all())
self.queue.put(None) # Sentinel
def __iter__(self):
thread = threading.Thread(target=self._worker, daemon=True)
thread.start()
while True:
item = self.queue.get()
if item is None:
break
yield item
client = Client()
ds = client.open_table("training_data")
loader = DataLoader(AsyncDeepLakeDataset(ds), batch_size=32)
for images, labels in loader:
# training_step(images, labels)
pass
This pattern fetches up to 128 rows concurrently in a background thread, yielding them through a queue. On image-heavy datasets, this can be 2-3x faster than sequential loading.
6. Custom PyTorch Dataset Wrapper¶
For complex preprocessing or multi-table joins, wrap open_table in a custom torch.utils.data.Dataset.
import io
import torch
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from deeplake import Client
class DeepLakeDataset(Dataset):
def __init__(self, client, table_name, transform=None):
self.ds = client.open_table(table_name)
self.transform = transform
self.length = len(self.ds)
def __len__(self):
return self.length
def __getitem__(self, idx):
row = self.ds[idx]
# Image columns return raw bytes, decode to PIL
image = Image.open(io.BytesIO(row["image"])).convert("RGB")
label = row["label"]
if self.transform:
image = self.transform(image)
return image, label
# Usage
client = Client()
dataset = DeepLakeDataset(
client,
"training_data",
transform=transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
]),
)
loader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)
for images, labels in loader:
# training_step(images, labels)
pass
Why no REST API?¶
Streaming high-performance tensor data over standard REST endpoints introduces significant latency and CPU overhead due to HTTP headers and JSON serialization. For high-throughput training, the Python SDK is the only supported method as it uses optimized C++ streaming kernels.
Performance Tips¶
| Parameter | Recommendation |
|---|---|
num_workers |
Start with the number of CPU cores. For I/O-bound workloads (cloud streaming), use 2-4x CPU cores. |
prefetch_factor |
Default is 2. Increase to 4-8 if your GPU is starved for data. |
batch_size |
Maximize GPU memory usage. Larger batches reduce per-sample overhead. |
shuffle |
Use True for training. Disable for inference to preserve order. |
pin_memory |
Set True when training on GPU to speed up host-to-device transfer. |
Streaming vs. local: Deeplake streams data from cloud storage by default. For repeated epochs on smaller datasets, consider caching locally to avoid redundant network I/O.
Example with all optimizations:
Parallel JPEG decoding: When streaming image datasets, JPEG decode on CPU is the bottleneck (not network or GPU). Use ThreadPoolExecutor to decode in parallel for a ~1.8x speedup:
from concurrent.futures import ThreadPoolExecutor
decode_pool = ThreadPoolExecutor(max_workers=8)
def decode_jpeg(img_bytes):
return Image.open(io.BytesIO(img_bytes)).convert("RGB")
for batch in ds.batches(batch_size=128):
images = list(decode_pool.map(decode_jpeg, batch["image"]))
# images is now a list of PIL Images, ready for transforms
| Decode method (batch_size=128) | rows/s | Speedup |
|---|---|---|
| PIL single-thread | ~250 | 1x |
| ThreadPool (8 workers) | ~400 | 1.6x |
Benchmark Results¶
All benchmarks run on NVIDIA A100-SXM4-40GB, streaming the aloha_shrimp dataset (56k rows, 480×640 JPEG images + 14-DOF float arrays) from Deeplake cloud storage. Reproducible with benchmark_dataloaders.py and benchmark_managed_dataloader.py.
Streaming Throughput (ds.batches())¶
| batch_size | Rows/s | MB/s | Gb/s |
|---|---|---|---|
| 64 | ~2,800 | ~70 | 0.56 |
| 128 | ~4,500 | ~100 | 1.01 |
| 256 | ~2,700 | ~68 | 0.54 |
Streaming + JPEG Decode + GPU Transfer¶
| Method (batch_size=128) | Rows/s | Bottleneck |
|---|---|---|
| PIL single-thread decode → GPU | ~250 | CPU decode |
| ThreadPool (8 workers) decode → GPU | ~400 | CPU decode |
| Raw stream (no decode) | ~4,500 | Network |
Ingestion Throughput (client.ingest())¶
| Rows | Time | Rows/s | MB/s |
|---|---|---|---|
| 1,000 | 5.5s | ~180 | ~5 |
| 2,000 | 6.5s | ~310 | ~9 |
| 50,000 | 36.4s | ~1,370 | ~38 |
VLA Fine-tuning (OpenVLA 7B + LoRA, real ALOHA data)¶
| Metric | Value |
|---|---|
| Training throughput | 7.5 rows/s |
| Loss (first 50 steps) | 106.3 |
| Loss (last 50 steps) | 7.5 |
| Loss reduction | 93% |
| GPU utilization | avg 89%, max 99% |
| GPU memory | 22.5 / 41 GB (55%) |
Known Limitations¶
| Issue | Impact |
|---|---|
DataLoader(shuffle=True, num_workers=0) |
~2 rows/s - per-row network fetch |
DataLoader(shuffle=True, num_workers>0) |
Hangs - C++ fork issue |
IterableDataset + shuffle=True |
ValueError - not supported by PyTorch |
ds.batches() shuffle |
Not supported - shuffle in memory or via TQL |
What to try next¶
- GPU-Streaming Pipeline: direct-to-GPU streaming for training.
- Massive Ingestion: prepare large-scale datasets for training.
- Reference: Querying: details on
open_table().