Skip to content

Physical AI & Robotics

Physical AI agents (robots, autonomous vehicles, drones) generate massive streams of multimodal data: high-fps video, LIDAR point clouds, and sensor telemetry. Deeplake's GPU-native engine allows you to ingest this data at scale and stream it directly to training loops without bottlenecking on the CPU.

Objective

Demonstrate a robotics data pipeline: ingest raw sensor data + video frames, store them with high-precision timestamps, and enable direct streaming to a GPU for training.

Prerequisites

  • Deeplake SDK: pip install deeplake
  • Robotics AI stack: pip install torch "transformers>=4.48,<5" pillow accelerate
  • A Deeplake API token.

Set credentials first

export DEEPLAKE_API_KEY="your-token-here"
export DEEPLAKE_WORKSPACE="your-workspace"  # optional, defaults to "default"

Complete Code

import time
import torch
from PIL import Image
from deeplake import Client
from transformers import AutoModel, AutoProcessor

# 1. Setup GPU-Native Robotics Data Lake
client = Client()

# 2. Setup Multimodal Encoder (ColQwen3 for visual state)
MODEL_ID = "TomoroAI/tomoro-colqwen3-embed-4b"
device = "cuda" if torch.cuda.is_available() else "cpu"
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)
model = AutoModel.from_pretrained(
    MODEL_ID, trust_remote_code=True, torch_dtype=torch.bfloat16, device_map=device
).eval()

def get_visual_embedding(image_path):
    """Generates a multi-vector visual embedding for the given image."""
    img = Image.open(image_path).convert("RGB")
    inputs = processor.process_images(images=[img])
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.inference_mode():
        return model(**inputs).embeddings[0].cpu().float().numpy()

# Multi-vector columns (FLOAT4[][]) require CREATE TABLE, then ingest into the existing table
client.query("""
    CREATE TABLE IF NOT EXISTS "robot_telemetry" (
        timestamp FLOAT8,
        camera_rgb TEXT,
        joint_angles FLOAT4[],
        forces FLOAT4[],
        visual_embedding FLOAT4[][],
        is_anomaly BOOLEAN
    ) USING deeplake
""")

# Log a batch of robot telemetry steps
image_paths = ["frame_001.png", "frame_002.png"]
joint_data = [[0.1, 0.2, 0.3], [0.15, 0.25, 0.35]]
force_data = [[1.2, 0.5], [1.3, 0.6]]

client.ingest("robot_telemetry", {
    "timestamp": [time.time(), time.time() + 0.1],
    "camera_rgb": image_paths,
    "joint_angles": joint_data,
    "forces": force_data,
    "visual_embedding": [get_visual_embedding(p) for p in image_paths],
    "is_anomaly": [False, False],
})

# 3. Retrieve Similar States (for Decision Planning)
# Search for episodes where the robot was in a similar visual state
def get_similar_episodes(query_image_path):
    # Generate multi-vector embedding for the current visual state
    current_emb = get_visual_embedding(query_image_path).tolist()

    # Format multi-vector as PG array literal: {{v1},{v2},...}
    emb_pg = "{" + ",".join(
        "{" + ",".join(str(v) for v in row) + "}" for row in current_emb
    ) + "}"
    return (
        client.table("robot_telemetry")
            .select("timestamp", "camera_rgb", "joint_angles")
            .order_by(f"visual_embedding <#> '{emb_pg}'::float4[][] DESC")
            .limit(3)
            .execute()
    )

# 4. Direct Streaming to GPU (Training Loop)
from torch.utils.data import DataLoader

ds = client.open_table("robot_telemetry")
dataloader = DataLoader(ds.pytorch(), batch_size=32, num_workers=4)

for batch in dataloader:
    # Tensors are loaded directly into GPU memory
    # train_policy(batch["camera_rgb"], batch["joint_angles"])
    print(f"Batch loaded: {len(batch['timestamp'])} steps")
    break
# Requires: export DEEPLAKE_API_KEY="..." (see quickstart)
# Requires: export DEEPLAKE_ORG_ID="your-org-id"
API_URL="https://api.deeplake.ai"
TABLE="robot_telemetry"

# 1. Create the multimodal schema for a robotics data lake
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEEPLAKE_API_KEY" \
  -H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
  -d '{
    "query": "CREATE TABLE IF NOT EXISTS \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" (id BIGSERIAL PRIMARY KEY, timestamp FLOAT8, camera_rgb TEXT, joint_angles FLOAT4[], forces FLOAT4[], visual_embedding FLOAT4[][], is_anomaly BOOLEAN) USING deeplake"
  }'

# 2. Insert robot sensor metadata
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEEPLAKE_API_KEY" \
  -H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
  -d '{
    "query": "INSERT INTO \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" (timestamp, camera_rgb, joint_angles, forces, visual_embedding, is_anomaly) VALUES (1700000001.0, $1, $2::float4[], $3::float4[], $4::float4[][], false)",
    "params": ["frame_001.png", "{0.1,0.2,0.3}", "{1.2,0.5}", "{{0.1,0.2,0.3},{0.4,0.5,0.6}}"]
  }'

# 4. Query recent anomalies
curl -s -X POST "$API_URL/workspaces/$DEEPLAKE_WORKSPACE/tables/query" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEEPLAKE_API_KEY" \
  -H "X-Activeloop-Org-Id: $DEEPLAKE_ORG_ID" \
  -d '{
    "query": "SELECT timestamp, joint_angles, forces FROM \"'$DEEPLAKE_WORKSPACE'\".\"'$TABLE'\" WHERE is_anomaly = true ORDER BY timestamp DESC LIMIT 10"
  }'

Step-by-Step Breakdown

1. GPU-Native Data Flow

Traditionally, robotics data is stored in slow file systems and moved to GPUs during training, creating a massive bottleneck. Deeplake stores data in a format that GPUs can read directly, enabling the high-throughput training required for modern foundation models in robotics.

2. Temporal Precision

By using FLOAT8 for timestamps and indexed metadata columns, you can perform range queries (e.g., WHERE timestamp BETWEEN X and Y) to retrieve specific "episodes" of robot movement for fine-tuning.

3. Native Multimodal Support

Deeplake managed tables can store raw binary data (images/video) alongside structured sensor arrays (FLOAT4[]). This ensures that your video frames are perfectly synchronized with your telemetry data, a requirement for imitation learning.

Performance Tips

  • Normalization Workers: When ingesting thousands of sensor files, use normalization_workers=8 in client.ingest() to parallelize file processing.
  • Buffering: Use commit_every=5000 for high-frequency streams to balance between peak performance and crash recovery.

What to try next