CUDA Streams with LatentRuntimeEngine
CUDA streams enable parallel execution of operations on the GPU, allowing multiple LatentRuntimeEngine instances to run concurrently without blocking each other. This guide explains when and how to use CUDA streams effectively with LatentRuntimeEngine.
What are CUDA Streams?
CUDA streams are sequences of operations that execute in order on the GPU. Different streams can execute concurrently, enabling:
- Parallel model inference: Run multiple models simultaneously
- Improved GPU utilization: Overlap computation and memory transfers
- Reduced latency: Minimize idle time between operations
When to Use CUDA Streams
Consider using CUDA streams when you have:
- Multiple models: Running different models concurrently
- High-throughput requirements: Need to maximize GPU utilization
- Multi-threaded applications: Different threads processing different data streams
- Pipeline processing: Overlapping preprocessing, inference, and postprocessing
Basic Setup
Creating CUDA Streams
import torch
from pylre import LatentRuntimeEngine as LRE
# Create CUDA streams
stream1 = torch.cuda.Stream()
stream2 = torch.cuda.Stream()
Initializing LRE with Streams
# Initialize LRE engines with different streams
lre1 = LRE(
"model1.onnx",
cuda_stream=stream1.cuda_stream,
tensorrt_engine_cache="/path/to/cache"
)
lre2 = LRE(
"model2.onnx",
cuda_stream=stream2.cuda_stream,
tensorrt_engine_cache="/path/to/cache"
)
Multi-threaded Execution Example
Here's an example showing how to use CUDA streams with multiple threads:
import torch
from pylre import LatentRuntimeEngine as LRE
import threading
import queue
import time
# Create CUDA streams
s1 = torch.cuda.Stream()
s2 = torch.cuda.Stream()
# Initialize LRE engines with different streams
lre1 = LRE(
"model1.onnx",
cuda_stream=s1.cuda_stream
)
lre2 = LRE(
"model2.onnx",
cuda_stream=s2.cuda_stream
)
# Create queue for error handling
error_queue = queue.Queue()
def worker_lre1():
"""Worker function for first LRE instance"""
try:
# Use stream context manager
with s1:
for _ in range(100):
random_tensor1 = torch.randn(1, 3, 640, 640, device="cuda")
out1 = lre1(random_tensor1)
except Exception as e:
error_queue.put(('lre1', e))
def worker_lre2():
"""Worker function for second LRE instance"""
try:
# Use stream context manager
with s2:
for _ in range(100):
random_tensor2 = torch.randn(1, 3, 640, 640, device="cuda")
out2 = lre2(random_tensor2)
except Exception as e:
error_queue.put(('lre2', e))
# Create and start threads
thread1 = threading.Thread(target=worker_lre1)
thread2 = threading.Thread(target=worker_lre2)
thread1.start()
thread2.start()
# Wait for threads to complete
thread1.join()
thread2.join()
# Check for any errors
while not error_queue.empty():
worker, error = error_queue.get()
print(f"Error in {worker}: {error}")
# Synchronize all CUDA operations
torch.cuda.synchronize()
Best Practices
1. Stream Context Management
Always use stream context managers to ensure proper stream assignment:
with stream:
# All CUDA operations in this block use the specified stream
tensor = torch.randn(1, 3, 640, 640, device="cuda")
output = lre(tensor)
2. Error Handling
Implement proper error handling in multi-threaded environments:
import queue
error_queue = queue.Queue()
def worker_function():
try:
# Your LRE operations
pass
except Exception as e:
error_queue.put(('worker_name', e))
# Check for errors after threads complete
while not error_queue.empty():
worker, error = error_queue.get()
print(f"Error in {worker}: {error}")
3. Memory Management
Be mindful of GPU memory usage when using multiple streams:
# Monitor GPU memory
print(f"GPU memory allocated: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
print(f"GPU memory cached: {torch.cuda.memory_reserved() / 1024**3:.2f} GB")
# Clear cache if needed
torch.cuda.empty_cache()
4. Synchronization
Use appropriate synchronization based on your needs:
# Synchronize specific stream
stream.synchronize()
# Synchronize all CUDA operations
torch.cuda.synchronize()
# Check if stream operations are complete (non-blocking)
if stream.query():
print("Stream operations completed")
Performance Considerations
Stream Concurrency
The number of concurrent streams depends on your hardware:
- GPU compute capability: Higher capability GPUs support more concurrent kernels
- Memory bandwidth: Multiple streams share memory bandwidth
- Model complexity: Larger models may limit concurrency
Optimal Stream Count
# Check GPU properties
device = torch.cuda.current_device()
props = torch.cuda.get_device_properties(device)
print(f"Multi-processor count: {props.multi_processor_count}")
print(f"Max threads per multi-processor: {props.max_threads_per_multi_processor}")
# Rule of thumb: Start with 2-4 streams and benchmark
num_streams = min(4, props.multi_processor_count // 2)
Common Pitfalls
1. Stream Assignment
Ensure tensors and operations are on the correct stream:
# Incorrect: tensor created on different stream
with stream1:
tensor = torch.randn(1, 3, 640, 640, device="cuda")
with stream2:
output = lre(tensor) # May cause synchronization
# Correct: all operations on same stream
with stream1:
tensor = torch.randn(1, 3, 640, 640, device="cuda")
output = lre(tensor)
2. Premature Synchronization
Avoid unnecessary synchronization that blocks parallelism:
# Avoid frequent synchronization
for i in range(iterations):
with stream:
output = lre(tensor)
torch.cuda.synchronize() # Blocks other streams
# Better: batch operations
with stream:
for i in range(iterations):
output = lre(tensor)
torch.cuda.synchronize() # Single synchronization
3. Resource Contention
Monitor for resource contention between streams:
# Use CUDA profiler to identify bottlenecks
with torch.profiler.profile(
activities=[torch.profiler.ProfilerActivity.CUDA],
schedule=torch.profiler.schedule(wait=1, warmup=1, active=3),
on_trace_ready=torch.profiler.tensorboard_trace_handler('./log')
) as prof:
# Your streaming operations
pass
Troubleshooting
Memory Issues
# Check available memory
free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0)
print(f"Free GPU memory: {free_memory / 1024**3:.2f} GB")
Stream Debugging
# Enable CUDA launch blocking for debugging
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
Performance Monitoring
# Use NVIDIA tools
# nvprof python your_script.py
# nvidia-smi dmon -s mu
This guide provides the foundation for using CUDA streams effectively with LatentRuntimeEngine. Start with simple scenarios and gradually increase complexity as you become comfortable with stream management.