Skip to content

CUDA Streams with LatentRuntimeEngine

CUDA streams enable parallel execution of operations on the GPU, allowing multiple LatentRuntimeEngine instances to run concurrently without blocking each other. This guide explains when and how to use CUDA streams effectively with LatentRuntimeEngine.

What are CUDA Streams?

CUDA streams are sequences of operations that execute in order on the GPU. Different streams can execute concurrently, enabling:

  • Parallel model inference: Run multiple models simultaneously
  • Improved GPU utilization: Overlap computation and memory transfers
  • Reduced latency: Minimize idle time between operations

When to Use CUDA Streams

Consider using CUDA streams when you have:

  1. Multiple models: Running different models concurrently
  2. High-throughput requirements: Need to maximize GPU utilization
  3. Multi-threaded applications: Different threads processing different data streams
  4. Pipeline processing: Overlapping preprocessing, inference, and postprocessing

Basic Setup

Creating CUDA Streams

import torch
from pylre import LatentRuntimeEngine as LRE

# Create CUDA streams
stream1 = torch.cuda.Stream()
stream2 = torch.cuda.Stream()

Initializing LRE with Streams

# Initialize LRE engines with different streams
lre1 = LRE(
    "model1.onnx", 
    cuda_stream=stream1.cuda_stream,
    tensorrt_engine_cache="/path/to/cache"
)

lre2 = LRE(
    "model2.onnx", 
    cuda_stream=stream2.cuda_stream,
    tensorrt_engine_cache="/path/to/cache"
)

Multi-threaded Execution Example

Here's an example showing how to use CUDA streams with multiple threads:

import torch
from pylre import LatentRuntimeEngine as LRE
import threading
import queue
import time

# Create CUDA streams
s1 = torch.cuda.Stream()
s2 = torch.cuda.Stream()

# Initialize LRE engines with different streams
lre1 = LRE(
    "model1.onnx", 
    cuda_stream=s1.cuda_stream
)
lre2 = LRE(
    "model2.onnx", 
    cuda_stream=s2.cuda_stream
)

# Create queue for error handling
error_queue = queue.Queue()

def worker_lre1():
    """Worker function for first LRE instance"""
    try:
        # Use stream context manager
        with s1:
            for _ in range(100):
                random_tensor1 = torch.randn(1, 3, 640, 640, device="cuda")
                out1 = lre1(random_tensor1)
    except Exception as e:
        error_queue.put(('lre1', e))

def worker_lre2():
    """Worker function for second LRE instance"""
    try:
        # Use stream context manager
        with s2:
            for _ in range(100):
                random_tensor2 = torch.randn(1, 3, 640, 640, device="cuda")
                out2 = lre2(random_tensor2)
    except Exception as e:
        error_queue.put(('lre2', e))

# Create and start threads
thread1 = threading.Thread(target=worker_lre1)
thread2 = threading.Thread(target=worker_lre2)

thread1.start()
thread2.start()

# Wait for threads to complete
thread1.join()
thread2.join()

# Check for any errors
while not error_queue.empty():
    worker, error = error_queue.get()
    print(f"Error in {worker}: {error}")

# Synchronize all CUDA operations
torch.cuda.synchronize()

Best Practices

1. Stream Context Management

Always use stream context managers to ensure proper stream assignment:

with stream:
    # All CUDA operations in this block use the specified stream
    tensor = torch.randn(1, 3, 640, 640, device="cuda")
    output = lre(tensor)

2. Error Handling

Implement proper error handling in multi-threaded environments:

import queue

error_queue = queue.Queue()

def worker_function():
    try:
        # Your LRE operations
        pass
    except Exception as e:
        error_queue.put(('worker_name', e))

# Check for errors after threads complete
while not error_queue.empty():
    worker, error = error_queue.get()
    print(f"Error in {worker}: {error}")

3. Memory Management

Be mindful of GPU memory usage when using multiple streams:

# Monitor GPU memory
print(f"GPU memory allocated: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
print(f"GPU memory cached: {torch.cuda.memory_reserved() / 1024**3:.2f} GB")

# Clear cache if needed
torch.cuda.empty_cache()

4. Synchronization

Use appropriate synchronization based on your needs:

# Synchronize specific stream
stream.synchronize()

# Synchronize all CUDA operations
torch.cuda.synchronize()

# Check if stream operations are complete (non-blocking)
if stream.query():
    print("Stream operations completed")

Performance Considerations

Stream Concurrency

The number of concurrent streams depends on your hardware:

  • GPU compute capability: Higher capability GPUs support more concurrent kernels
  • Memory bandwidth: Multiple streams share memory bandwidth
  • Model complexity: Larger models may limit concurrency

Optimal Stream Count

# Check GPU properties
device = torch.cuda.current_device()
props = torch.cuda.get_device_properties(device)
print(f"Multi-processor count: {props.multi_processor_count}")
print(f"Max threads per multi-processor: {props.max_threads_per_multi_processor}")

# Rule of thumb: Start with 2-4 streams and benchmark
num_streams = min(4, props.multi_processor_count // 2)

Common Pitfalls

1. Stream Assignment

Ensure tensors and operations are on the correct stream:

# Incorrect: tensor created on different stream
with stream1:
    tensor = torch.randn(1, 3, 640, 640, device="cuda")

with stream2:
    output = lre(tensor)  # May cause synchronization

# Correct: all operations on same stream
with stream1:
    tensor = torch.randn(1, 3, 640, 640, device="cuda")
    output = lre(tensor)

2. Premature Synchronization

Avoid unnecessary synchronization that blocks parallelism:

# Avoid frequent synchronization
for i in range(iterations):
    with stream:
        output = lre(tensor)
    torch.cuda.synchronize()  # Blocks other streams

# Better: batch operations
with stream:
    for i in range(iterations):
        output = lre(tensor)
torch.cuda.synchronize()  # Single synchronization

3. Resource Contention

Monitor for resource contention between streams:

# Use CUDA profiler to identify bottlenecks
with torch.profiler.profile(
    activities=[torch.profiler.ProfilerActivity.CUDA],
    schedule=torch.profiler.schedule(wait=1, warmup=1, active=3),
    on_trace_ready=torch.profiler.tensorboard_trace_handler('./log')
) as prof:
    # Your streaming operations
    pass

Troubleshooting

Memory Issues

# Check available memory
free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0)
print(f"Free GPU memory: {free_memory / 1024**3:.2f} GB")

Stream Debugging

# Enable CUDA launch blocking for debugging
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

Performance Monitoring

# Use NVIDIA tools
# nvprof python your_script.py
# nvidia-smi dmon -s mu

This guide provides the foundation for using CUDA streams effectively with LatentRuntimeEngine. Start with simple scenarios and gradually increase complexity as you become comfortable with stream management.