OCS-CI Logging Guide for Contributors
This guide explains how to use logging effectively in the OCS-CI framework. Following these guidelines ensures consistent, readable logs that make debugging easier for everyone.
Table of Contents
Quick Start
Getting the Logger
All code should use the standard Python logging pattern:
import logging
logger = logging.getLogger(__name__)
The framework automatically provides a custom OCSCILogger with extended log levels.
Important: The logger creation statement should be placed immediately after import statements, before any other code.
Best Practice
# test_example.py
import logging
import pytest
from ocs_ci.framework.testlib import tier1, ManageTest
from ocs_ci.ocs.resources.pvc import create_pvc
logger = logging.getLogger(__name__) # First line after imports
@tier1
class TestExample(ManageTest):
"""Example test class"""
def test_something(self):
logger.info("Test starting")
# Test code here
Why Use __name__?
Using logging.getLogger(__name__) creates a logger named after the module (e.g., tests.functional.test_pvc or ocs_ci.helpers.helpers). This:
Creates a hierarchical logger structure
Makes it easy to filter logs by module
Helps identify the source of log messages
Follows Python logging best practices
What NOT to Do
# ❌ Wrong: Using a custom string
logger = logging.getLogger("my_logger")
# ❌ Wrong: Getting the root logger
logger = logging.getLogger()
# ❌ Wrong: Creating logger inside functions
def my_function():
logger = logging.getLogger(__name__) # Creates logger on every call
logger.info("Message")
# ❌ Wrong: Placing logger creation after other code
import logging
from ocs_ci.framework.testlib import tier1
CONSTANT = "value"
def helper_function():
return "data"
logger = logging.getLogger(__name__) # Should be right after imports
Available Log Levels
From highest to lowest severity:
Level |
Value |
Method |
Use For |
|---|---|---|---|
CRITICAL |
50 |
|
System failures, framework crashes |
ERROR |
40 |
|
Test failures, exceptions, errors |
WARNING |
30 |
|
Deprecations, retries, potential issues |
TEST_STEP |
25 |
|
Major test workflow phases |
INFO |
20 |
|
General progress, operations |
ASSERTION |
15 |
|
Test validations and assertions |
DEBUG |
10 |
|
Detailed diagnostic info |
AI_DATA |
5 |
|
ML predictions, metrics, analysis |
Log Level Selection Guide
CRITICAL (50) - System Failures
When to use:
Framework initialization failures
Unrecoverable cluster state
Critical resource exhaustion
Infrastructure failures requiring immediate attention
Examples:
# Framework initialization
try:
initialize_framework()
except Exception as e:
logger.critical(f"Framework initialization failed: {e}")
raise
# Critical resource state
if not check_cluster_api_accessible():
logger.critical("Cluster API is unreachable - cannot continue testing")
sys.exit(1)
# OCP/OCS critical failures
if storage_cluster_state == "Failed":
logger.critical(
f"StorageCluster in Failed state: {get_cluster_conditions()}"
)
Output:
CRITICAL - Framework initialization failed: Cannot load config from /path/to/config
CRITICAL - Cluster API is unreachable - cannot continue testing
CRITICAL - StorageCluster in Failed state: Phase=Failed, Message='Ceph cluster down'
ERROR (40) - Test Failures & Exceptions
When to use:
Test failures (caught by pytest)
Exceptions during operations
Resource creation/deletion failures
Validation failures that block progress
Examples:
# Resource creation failure
try:
pvc = create_pvc(sc_name="ocs-storagecluster-ceph-rbd", size="10Gi")
except Exception as e:
logger.error(f"Failed to create PVC: {e}")
raise
# Timeout errors
if not wait_for_resource_state(pvc, "Bound", timeout=300):
logger.error(
f"PVC {pvc.name} did not reach Bound state after 300s. "
f"Current state: {pvc.status.phase}"
)
raise TimeoutError("PVC creation timeout")
# Deployment failures
if deployment_failed:
logger.error(
f"Cluster deployment failed at stage '{current_stage}': {error_msg}"
)
logger.error(f"Deployment logs: {get_deployment_logs()}")
Output:
ERROR - Failed to create PVC: API server returned 500: Internal Server Error
ERROR - PVC test-pvc-rbd did not reach Bound state after 300s. Current state: Pending
ERROR - Cluster deployment failed at stage 'bootstrap': timeout waiting for bootstrap complete
WARNING (30) - Potential Issues
When to use:
Deprecated features/parameters
Retry attempts
Non-critical misconfigurations
Performance concerns (not failures)
Skipped optional operations
Examples:
# Deprecation warnings
def old_function(param):
logger.warning(
"old_function() is deprecated and will be removed in v5.0. "
"Use new_function() instead."
)
return new_function(param)
# Retry attempts
for attempt in range(1, max_retries + 1):
try:
return perform_operation()
except TemporaryError as e:
logger.warning(
f"Operation failed (attempt {attempt}/{max_retries}): {e}. "
f"Retrying in {retry_delay}s..."
)
time.sleep(retry_delay)
# Configuration issues
if not config.get("optimal_setting"):
logger.warning(
"optimal_setting not configured. Using default value which may "
"impact performance. Consider setting in your ocsci-conf file."
)
# Resource constraints
current_memory = get_available_memory()
if current_memory < recommended_memory:
logger.warning(
f"Available memory ({current_memory}GB) is below recommended "
f"({recommended_memory}GB). Tests may run slower."
)
# Skipped operations
if not managed_service:
logger.warning("Skipping CephFS replica check on non-managed deployments")
return
Output:
WARNING - old_function() is deprecated and will be removed in v5.0. Use new_function() instead.
WARNING - Operation failed (attempt 2/3): Connection timeout. Retrying in 5s...
WARNING - optimal_setting not configured. Using default value which may impact performance.
WARNING - Available memory (8GB) is below recommended (16GB). Tests may run slower.
TEST_STEP (25) - Test Workflow Phases
When to use:
Major phases in test execution
Setup/teardown operations
Deployment stages
Workflow checkpoints
Key features:
Automatic step numbering per logger
Helps visualize test flow
Makes logs scannable
Examples:
# Test workflow
def test_pvc_snapshot_restore(self):
"""Test PVC snapshot creation and restoration"""
logger.test_step("Create source PVC and write test data")
source_pvc = create_pvc(size="5Gi")
write_test_data(source_pvc, pattern="random", size="1Gi")
original_checksum = calculate_checksum(source_pvc)
logger.test_step("Create volume snapshot")
snapshot = create_snapshot(source_pvc)
wait_for_snapshot_ready(snapshot, timeout=300)
logger.test_step("Restore snapshot to new PVC")
restored_pvc = restore_from_snapshot(snapshot, size="5Gi")
wait_for_pvc_bound(restored_pvc, timeout=300)
logger.test_step("Verify data integrity in restored PVC")
restored_checksum = calculate_checksum(restored_pvc)
assert original_checksum == restored_checksum, "Data mismatch after restore"
logger.test_step("Cleanup test resources")
delete_pvc(restored_pvc)
delete_snapshot(snapshot)
delete_pvc(source_pvc)
# Setup/teardown
def setup_method(self):
"""Setup test environment"""
logger.test_step("Create dedicated test namespace")
self.namespace = create_namespace(name="test-mcg")
logger.test_step("Deploy MCG resources")
deploy_mcg_components(namespace=self.namespace)
logger.test_step("Configure S3 test credentials")
self.s3_creds = create_s3_credentials()
def teardown_method(self):
"""Cleanup test environment"""
logger.test_step("Delete test buckets and objects")
cleanup_s3_resources(self.s3_creds)
logger.test_step("Remove MCG resources")
delete_mcg_components(namespace=self.namespace)
logger.test_step("Delete test namespace")
delete_namespace(self.namespace)
# Deployment stages
def deploy_ocp_cluster(self):
"""Deploy OpenShift cluster"""
logger.test_step("Generate cluster installation manifests")
generate_manifests(cluster_name=self.cluster_name)
logger.test_step("Deploy bootstrap node")
deploy_bootstrap_node()
logger.test_step("Deploy control plane nodes")
deploy_control_plane(node_count=3)
logger.test_step("Wait for bootstrap complete")
wait_for_bootstrap_complete(timeout=1800)
logger.test_step("Deploy worker nodes")
deploy_worker_nodes(node_count=3)
logger.test_step("Finalize cluster installation")
finalize_installation()
# Complex multi-phase test
def test_cluster_upgrade(self):
"""Test ODF cluster upgrade from 4.15 to 4.16"""
logger.test_step("Verify pre-upgrade cluster health")
verify_cluster_health()
verify_all_pods_running()
logger.test_step("Backup current cluster configuration")
backup = create_cluster_backup()
logger.test_step("Update operator subscription channel")
update_subscription(channel="stable-4.16")
logger.test_step("Monitor operator upgrade progress")
wait_for_operator_upgrade(timeout=1800)
logger.test_step("Wait for all pods to be updated")
wait_for_pod_rollout_complete(timeout=1800)
logger.test_step("Verify post-upgrade cluster health")
verify_cluster_health()
verify_storage_functionality()
logger.test_step("Run post-upgrade validation suite")
run_smoke_tests()
Output:
TEST_STEP 1 - Create source PVC and write test data
TEST_STEP 2 - Create volume snapshot
TEST_STEP 3 - Restore snapshot to new PVC
TEST_STEP 4 - Verify data integrity in restored PVC
TEST_STEP 5 - Cleanup test resources
INFO (20) - General Progress
When to use:
Successful operations
High-level progress milestones (starting, completion, major checkpoints)
State changes
Resource creation/deletion (successful)
Configuration details
Most day-to-day logging
Note: Use DEBUG for iteration-level details within loops. INFO should mark the start, completion, or major milestones of an operation, not each iteration.
Examples:
# Resource operations
def create_pvc(name, size, sc_name):
logger.info(
f"Creating PVC '{name}': size={size}, storage_class={sc_name}"
)
pvc = PVC(name=name, size=size, storage_class=sc_name)
pvc.create()
logger.info(f"PVC '{name}' created successfully")
return pvc
# High-level progress milestones
def wait_for_pods_ready(namespace, timeout=300):
logger.info(
f"Waiting for all pods in namespace '{namespace}' to be ready "
f"(timeout: {timeout}s)"
)
start_time = time.time()
while time.time() - start_time < timeout:
pods = get_pods(namespace)
ready_count = sum(1 for p in pods if p.is_ready())
# Use DEBUG for iteration details
logger.debug(
f"Pod readiness check: {ready_count}/{len(pods)} pods ready"
)
if ready_count == len(pods):
logger.info(f"All {len(pods)} pods ready")
return True
time.sleep(10)
logger.warning(
f"Timeout waiting for pods in '{namespace}': "
f"{ready_count}/{len(pods)} ready after {timeout}s"
)
return False
# Configuration
def setup_cluster_monitoring(config):
logger.info("Configuring cluster monitoring")
logger.info(f"Retention period: {config['retention_days']} days")
logger.info(f"Storage size: {config['storage_size']}")
logger.info(f"Storage class: {config['storage_class']}")
apply_monitoring_config(config)
logger.info("Cluster monitoring configured successfully")
# State changes
def scale_deployment(deployment_name, replicas):
current_replicas = get_deployment_replicas(deployment_name)
logger.info(
f"Scaling deployment '{deployment_name}': "
f"{current_replicas} -> {replicas} replicas"
)
scale(deployment_name, replicas)
logger.info(f"Deployment '{deployment_name}' scaled successfully")
Output:
INFO - Creating PVC 'test-pvc-rbd': size=10Gi, storage_class=ocs-storagecluster-ceph-rbd
INFO - PVC 'test-pvc-rbd' created successfully
INFO - Waiting for all pods in namespace 'openshift-storage' to be ready (timeout: 300s)
INFO - All 12 pods ready
INFO - Configuring cluster monitoring
INFO - Retention period: 7 days
INFO - Storage size: 100Gi
INFO - Storage class: ocs-storagecluster-ceph-rbd
INFO - Cluster monitoring configured successfully
INFO - Scaling deployment 'rook-ceph-mon': 3 -> 5 replicas
INFO - Deployment 'rook-ceph-mon' scaled successfully
ASSERTION (15) - Test Validations
When to use:
Before/after Python assertions
Test checkpoint validations
Expected vs actual comparisons
Pass/fail validations
Examples:
# Basic assertion logging
def test_pvc_creation(self):
pvc = create_pvc(size="10Gi")
logger.assertion(
f"PVC status: expected='Bound', actual='{pvc.status}'"
)
assert pvc.status == "Bound", f"PVC not bound: {pvc.status}"
# Numeric comparisons
def test_performance_threshold(self):
iops = measure_iops()
min_iops = 1000
logger.assertion(
f"IOPS check: measured={iops}, minimum={min_iops}, "
f"passed={iops >= min_iops}"
)
assert iops >= min_iops, f"IOPS below threshold: {iops} < {min_iops}"
# Multiple conditions
def test_cluster_health(self):
health_checks = {
"operators_ready": check_operators(),
"storage_cluster_healthy": check_storage_cluster(),
"ceph_healthy": check_ceph_health(),
"all_pods_running": check_all_pods_ready(),
}
for check_name, result in health_checks.items():
logger.assertion(f"{check_name}: {result}")
assert result, f"Health check failed: {check_name}"
# List/collection validations
def test_pod_count(self):
pods = get_pods_in_namespace("openshift-storage")
expected_count = 12
logger.assertion(
f"Pod count: expected={expected_count}, actual={len(pods)}, "
f"match={len(pods) == expected_count}"
)
assert len(pods) == expected_count, \
f"Expected {expected_count} pods, found {len(pods)}"
# Validation with tolerance
def test_capacity_usage(self):
usage_pct = get_cluster_capacity_usage()
expected = 75.0
tolerance = 5.0
within_tolerance = abs(usage_pct - expected) <= tolerance
logger.assertion(
f"Capacity usage: measured={usage_pct}%, expected={expected}%, "
f"tolerance=±{tolerance}%, passed={within_tolerance}"
)
assert within_tolerance, \
f"Usage {usage_pct}% outside expected range {expected}±{tolerance}%"
Output:
ASSERTION - PVC status: expected='Bound', actual='Bound'
ASSERTION - IOPS check: measured=1250, minimum=1000, passed=True
ASSERTION - operators_ready: True
ASSERTION - storage_cluster_healthy: True
ASSERTION - Pod count: expected=12, actual=12, match=True
ASSERTION - Capacity usage: measured=73.5%, expected=75%, tolerance=±5%, passed=True
DEBUG (10) - Detailed Diagnostics
When to use:
Internal function flow
Variable values during debugging
API request/response details
Detailed resource states
Iteration-level details within loops (not completion/start)
Helper function internals
Note: Use DEBUG for per-iteration updates in loops. Use INFO to mark when the overall operation starts or completes.
Examples:
# Function flow
def create_and_attach_pvc(pod_name, pvc_name, size):
logger.debug(f"Entering create_and_attach_pvc: pod={pod_name}, pvc={pvc_name}")
logger.debug(f"Creating PVC with size={size}")
pvc = create_pvc(pvc_name, size)
logger.debug(f"PVC object created: {pvc.to_dict()}")
logger.debug(f"Attaching PVC to pod {pod_name}")
attach_pvc_to_pod(pod_name, pvc)
logger.debug("Attachment successful")
return pvc
# API interactions
def get_storage_cluster():
logger.debug("Fetching StorageCluster resource")
logger.debug(f"API endpoint: {API_BASE}/odf.openshift.io/v1/storageclusters")
response = api_client.get(
"/apis/odf.openshift.io/v1/storageclusters",
namespace="openshift-storage"
)
logger.debug(f"API response status: {response.status_code}")
logger.debug(f"API response body: {response.json()}")
return response.json()
# Variable inspection
def calculate_required_nodes(capacity_gb, node_size_gb):
logger.debug(f"Calculating required nodes: capacity={capacity_gb}GB, node_size={node_size_gb}GB")
nodes_needed = math.ceil(capacity_gb / node_size_gb)
logger.debug(f"Raw calculation: {capacity_gb} / {node_size_gb} = {nodes_needed}")
# Add replica overhead
with_replica = nodes_needed * 3
logger.debug(f"With 3x replica: {with_replica}")
# Add headroom
final_count = math.ceil(with_replica * 1.2)
logger.debug(f"With 20% headroom: {final_count}")
return final_count
# Iteration-level details (use INFO for start/completion, DEBUG for iterations)
def wait_for_resource(resource_name, desired_state, timeout=300):
logger.info(
f"Waiting for resource '{resource_name}' to reach '{desired_state}' "
f"(timeout: {timeout}s)"
)
start_time = time.time()
iteration = 0
while time.time() - start_time < timeout:
iteration += 1
current_state = get_resource_state(resource_name)
# DEBUG for each iteration
logger.debug(
f"Wait iteration {iteration}: current_state={current_state}, "
f"elapsed={int(time.time() - start_time)}s"
)
if current_state == desired_state:
logger.info(
f"Resource '{resource_name}' reached '{desired_state}' "
f"after {int(time.time() - start_time)}s"
)
return True
time.sleep(5)
logger.warning(
f"Timeout waiting for resource '{resource_name}': "
f"expected='{desired_state}', actual='{current_state}', timeout={timeout}s"
)
return False
# Detailed resource state
def verify_storage_cluster():
sc = get_storage_cluster()
logger.debug(f"StorageCluster phase: {sc.status.phase}")
logger.debug(f"StorageCluster conditions: {sc.status.conditions}")
logger.debug(f"OSD count: {len(sc.status.osd_deployment_status)}")
for osd in sc.status.osd_deployment_status:
logger.debug(f"OSD {osd.name}: state={osd.state}, node={osd.node}")
Output:
DEBUG - Entering create_and_attach_pvc: pod=test-pod, pvc=test-pvc
DEBUG - Creating PVC with size=10Gi
DEBUG - PVC object created: {'kind': 'PersistentVolumeClaim', 'metadata': {...}}
DEBUG - Fetching StorageCluster resource
DEBUG - API response status: 200
DEBUG - Wait iteration 5: current_state=Pending, elapsed=25s
AI_DATA (5) - ML/AI Information
When to use:
Machine learning predictions
AI-generated insights
Model inference details
Anomaly detection results
Performance predictions
Training/inference metrics
Note: This level is below DEBUG and requires explicit enabling. Only use for AI/ML features.
Examples:
# Failure prediction
def predict_disk_failure(disk_id):
features = extract_disk_features(disk_id)
logger.ai_data(f"Model input features: {features}")
prediction = ml_model.predict(features)
probability = prediction['failure_probability']
logger.ai_data(
f"Disk failure prediction: disk={disk_id}, "
f"probability={probability:.2%}, model=disk_health_v2"
)
if probability > 0.8:
logger.warning(
f"High disk failure probability detected: {disk_id} ({probability:.2%})"
)
return prediction
# Anomaly detection
def detect_performance_anomaly(metrics):
baseline = get_baseline_metrics()
z_score = calculate_z_score(metrics, baseline)
logger.ai_data(
f"Performance metrics: current={metrics}, baseline={baseline}, "
f"z_score={z_score:.2f}"
)
if z_score > 2.0:
logger.ai_data(
f"Anomaly detected: z_score={z_score:.2f} exceeds threshold=2.0"
)
logger.warning(
f"Performance anomaly: latency {z_score:.1f}σ above baseline"
)
# Cluster scaling recommendation
def recommend_cluster_scale():
current_utilization = get_cluster_utilization()
predicted_peak = predict_peak_utilization()
logger.ai_data(
f"Utilization analysis: current={current_utilization:.1%}, "
f"predicted_peak={predicted_peak:.1%}"
)
if predicted_peak > 0.9:
recommendation = calculate_node_recommendation()
logger.ai_data(
f"Scale recommendation: add {recommendation['nodes']} nodes "
f"(confidence={recommendation['confidence']:.2%})"
)
logger.info(
f"Cluster scale recommended: add {recommendation['nodes']} nodes "
f"to handle predicted load"
)
# Test runtime prediction
def predict_test_runtime(test_name):
historical_data = get_test_history(test_name)
logger.ai_data(f"Historical runtimes: {historical_data}")
prediction = time_series_model.predict(test_name)
logger.ai_data(
f"Test runtime prediction: test={test_name}, "
f"predicted={prediction['duration']}min, "
f"confidence_interval=±{prediction['std']}min, "
f"model=runtime_predictor_v1"
)
return prediction
Output:
AI_DATA - Disk failure prediction: disk=sda, probability=85.3%, model=disk_health_v2
AI_DATA - Performance metrics: current={'latency': 150}, baseline={'latency': 50}, z_score=3.20
AI_DATA - Anomaly detected: z_score=3.20 exceeds threshold=2.0
AI_DATA - Scale recommendation: add 2 nodes (confidence=94.2%)
Usage by Code Type
Test Files (tests/)
Tests should use TEST_STEP and ASSERTION liberally to make test flow clear:
import logging
import pytest
from ocs_ci.framework.testlib import tier1, ManageTest
from ocs_ci.ocs.resources.pvc import create_pvc, delete_pvc
logger = logging.getLogger(__name__)
@tier1
@pytest.mark.polarion_id("OCS-1234")
class TestPVCLifecycle(ManageTest):
"""Test PVC lifecycle operations"""
def test_pvc_create_delete(self):
"""
Test PVC creation and deletion
Steps:
1. Create PVC
2. Verify PVC is bound
3. Delete PVC
4. Verify PVC is removed
"""
pvc_name = "test-pvc-lifecycle"
logger.test_step("Create PVC with RBD storage class")
pvc = create_pvc(
name=pvc_name,
size="10Gi",
storage_class="ocs-storagecluster-ceph-rbd"
)
logger.test_step("Verify PVC reaches Bound state")
pvc.reload()
logger.assertion(
f"PVC status: expected='Bound', actual='{pvc.status}'"
)
assert pvc.status == "Bound", f"PVC not bound: {pvc.status}"
logger.test_step("Delete PVC")
delete_pvc(pvc)
logger.test_step("Verify PVC is removed from cluster")
deleted = verify_pvc_deleted(pvc_name, timeout=60)
logger.assertion(
f"PVC deletion: pvc={pvc_name}, deleted={deleted}"
)
assert deleted, f"PVC {pvc_name} still exists after deletion"
Deployment Code (ocs_ci/deployment/)
Deployment code should use TEST_STEP for major phases, INFO for progress:
import logging
from ocs_ci.deployment.helpers import create_manifest, run_installer
logger = logging.getLogger(__name__)
class OCPDeployment:
"""OpenShift cluster deployment"""
def deploy(self):
"""Deploy OpenShift cluster"""
logger.test_step("Validate deployment prerequisites")
self._validate_prerequisites()
logger.test_step("Generate installation manifests")
manifest_dir = self._generate_manifests()
logger.info(f"Manifests generated in: {manifest_dir}")
logger.test_step("Deploy infrastructure")
self._deploy_infrastructure()
logger.test_step("Bootstrap cluster")
self._bootstrap_cluster()
logger.test_step("Complete installation")
self._complete_installation()
logger.info("Cluster deployment completed successfully")
def _validate_prerequisites(self):
"""Validate prerequisites"""
logger.info("Checking pull secret")
if not self.pull_secret_exists():
logger.error("Pull secret not found")
raise FileNotFoundError("Missing pull secret")
logger.info("Checking cloud credentials")
if not self.credentials_valid():
logger.error("Cloud credentials invalid or missing")
raise ValueError("Invalid credentials")
logger.debug(f"Using region: {self.region}")
logger.debug(f"Using instance type: {self.instance_type}")
def _deploy_infrastructure(self):
"""Deploy infrastructure resources"""
logger.info("Creating VPC")
self.vpc_id = self.cloud_provider.create_vpc()
logger.info(f"VPC created: {self.vpc_id}")
logger.info("Creating subnets")
self.subnets = self.cloud_provider.create_subnets(self.vpc_id)
logger.info(f"Created {len(self.subnets)} subnets")
logger.debug(f"Subnet IDs: {[s.id for s in self.subnets]}")
Helper Functions (ocs_ci/helpers/)
Helpers should use INFO for operations, DEBUG for details:
import logging
from ocs_ci.ocs.resources.pod import get_pods
from ocs_ci.ocs import constants
logger = logging.getLogger(__name__)
def wait_for_pods_ready(namespace, pod_count=None, timeout=300):
"""
Wait for pods to be ready in namespace
Args:
namespace (str): Namespace name
pod_count (int): Expected pod count (optional)
timeout (int): Timeout in seconds
Returns:
bool: True if all pods ready, False otherwise
"""
logger.info(
f"Waiting for pods in namespace '{namespace}' (timeout: {timeout}s)"
)
if pod_count:
logger.debug(f"Expecting {pod_count} pods")
start_time = time.time()
iteration = 0
while time.time() - start_time < timeout:
iteration += 1
pods = get_pods(namespace)
ready_count = sum(1 for p in pods if p.is_ready())
logger.debug(
f"Iteration {iteration}: {ready_count}/{len(pods)} pods ready"
)
if pod_count and len(pods) != pod_count:
logger.debug(
f"Pod count mismatch: expected={pod_count}, actual={len(pods)}"
)
time.sleep(5)
continue
if ready_count == len(pods):
elapsed = int(time.time() - start_time)
logger.info(f"All {len(pods)} pods ready after {elapsed}s")
return True
time.sleep(5)
logger.warning(
f"Timeout waiting for pods in '{namespace}': "
f"{ready_count}/{len(pods)} ready after {timeout}s"
)
return False
def get_cluster_capacity():
"""
Get cluster storage capacity information
Returns:
dict: Capacity information
"""
logger.debug("Fetching cluster capacity")
storage_cluster = get_storage_cluster()
logger.debug(f"StorageCluster name: {storage_cluster.name}")
capacity_info = {
'total': storage_cluster.status.capacity.total,
'used': storage_cluster.status.capacity.used,
'available': storage_cluster.status.capacity.available,
}
logger.debug(f"Capacity: {capacity_info}")
usage_pct = (capacity_info['used'] / capacity_info['total']) * 100
logger.info(f"Cluster capacity usage: {usage_pct:.1f}%")
return capacity_info
Utilities (ocs_ci/utility/)
Utilities should focus on INFO for user-facing operations:
import logging
import sys
from ocs_ci.framework import config
from ocs_ci.ocs.resources.storage_cluster import get_storage_cluster
logger = logging.getLogger(__name__)
def report_cluster_version():
"""
Report OCS/ODF cluster version information
Command-line utility to display cluster versions
"""
logger.info("Collecting cluster version information...")
try:
storage_cluster = get_storage_cluster()
except Exception as e:
logger.error(f"Failed to get StorageCluster: {e}")
sys.exit(1)
logger.debug(f"StorageCluster object: {storage_cluster.name}")
version_info = {
'ODF': storage_cluster.get_odf_version(),
'Ceph': storage_cluster.get_ceph_version(),
'NooBaa': storage_cluster.get_noobaa_version(),
}
logger.info("=== Cluster Version Information ===")
for component, version in version_info.items():
logger.info(f"{component:15s}: {version}")
logger.debug("Version reporting complete")
def cleanup_test_resources(namespace):
"""
Cleanup test resources in namespace
Args:
namespace (str): Namespace to clean
"""
logger.info(f"Starting cleanup of namespace: {namespace}")
logger.info("Deleting PVCs...")
pvcs = get_all_pvcs(namespace)
logger.info(f"Found {len(pvcs)} PVCs")
for pvc in pvcs:
logger.debug(f"Deleting PVC: {pvc.name}")
try:
delete_pvc(pvc)
except Exception as e:
logger.warning(f"Failed to delete PVC {pvc.name}: {e}")
logger.info("Deleting pods...")
pods = get_all_pods(namespace)
logger.info(f"Found {len(pods)} pods")
for pod in pods:
logger.debug(f"Deleting pod: {pod.name}")
try:
delete_pod(pod)
except Exception as e:
logger.warning(f"Failed to delete pod {pod.name}: {e}")
logger.info(f"Cleanup of namespace '{namespace}' completed")
Resource Classes (ocs_ci/ocs/resources/)
Resource classes should use DEBUG extensively, INFO for state changes:
import logging
from ocs_ci.ocs import constants
from ocs_ci.ocs.ocp import OCP
logger = logging.getLogger(__name__)
class PVC:
"""Represents a PersistentVolumeClaim"""
def __init__(self, name, namespace, size, storage_class):
self.name = name
self.namespace = namespace
self.size = size
self.storage_class = storage_class
logger.debug(
f"PVC object initialized: name={name}, namespace={namespace}, "
f"size={size}, sc={storage_class}"
)
def create(self):
"""Create the PVC"""
logger.info(
f"Creating PVC '{self.name}' in namespace '{self.namespace}'"
)
manifest = self._generate_manifest()
logger.debug(f"PVC manifest: {manifest}")
ocp = OCP(kind=constants.PVC, namespace=self.namespace)
ocp.create(manifest)
logger.info(f"PVC '{self.name}' created")
def reload(self):
"""Reload PVC state from cluster"""
logger.debug(f"Reloading PVC state: {self.name}")
ocp = OCP(kind=constants.PVC, namespace=self.namespace)
data = ocp.get(resource_name=self.name)
self.status = data['status']['phase']
logger.debug(f"PVC {self.name} status: {self.status}")
def wait_for_bound(self, timeout=300):
"""Wait for PVC to reach Bound state"""
logger.info(
f"Waiting for PVC '{self.name}' to be bound (timeout: {timeout}s)"
)
start_time = time.time()
while time.time() - start_time < timeout:
self.reload()
logger.debug(f"Current status: {self.status}")
if self.status == constants.STATUS_BOUND:
elapsed = int(time.time() - start_time)
logger.info(
f"PVC '{self.name}' bound after {elapsed}s"
)
return True
time.sleep(5)
logger.error(
f"PVC '{self.name}' did not reach Bound state after {timeout}s"
)
return False
Common Patterns
Pattern: Progressive Logging
Use different levels as operations progress:
def complex_operation():
"""Example of progressive logging"""
# High-level phase
logger.test_step("Execute complex operation")
# Operation details
logger.info("Starting sub-operation A")
logger.debug("Sub-operation A parameters: {...}")
result_a = sub_operation_a()
# Validation
logger.assertion(f"Sub-operation A result: {result_a}")
assert result_a == expected, "Operation A failed"
# Continue
logger.info("Starting sub-operation B")
result_b = sub_operation_b()
logger.info("Complex operation completed successfully")
Pattern: Retry Logging
Log retries clearly:
def retry_operation(max_attempts=3):
"""Operation with retry logic"""
for attempt in range(1, max_attempts + 1):
logger.debug(f"Attempt {attempt}/{max_attempts}")
try:
result = perform_operation()
logger.info(f"Operation succeeded on attempt {attempt}")
return result
except TemporaryError as e:
if attempt < max_attempts:
logger.warning(
f"Operation failed (attempt {attempt}/{max_attempts}): {e}. "
f"Retrying in 5s..."
)
time.sleep(5)
else:
logger.error(
f"Operation failed after {max_attempts} attempts: {e}"
)
raise
Pattern: Iteration Logging
Use INFO for start/completion, DEBUG for iteration details:
def wait_for_condition(condition_func, timeout=300):
"""Wait for condition with proper iteration logging"""
logger.info(f"Waiting for condition (timeout: {timeout}s)")
start_time = time.time()
iteration = 0
while time.time() - start_time < timeout:
iteration += 1
# DEBUG for each iteration
logger.debug(f"Checking condition (iteration {iteration})")
if condition_func():
elapsed = int(time.time() - start_time)
logger.info(f"Condition met after {elapsed}s")
return True
time.sleep(5)
logger.warning(f"Condition not met after {timeout}s")
return False
Pattern: TimeoutSampler Logging
Use TimeoutSampler for polling operations while avoiding repetitive logs:
from ocs_ci.utility.utils import TimeoutSampler
from ocs_ci.ocs.resources.pod import get_all_pods
from ocs_ci.ocs import constants
from ocs_ci.ocs.exceptions import TimeoutExpiredError
def wait_for_pods_ready(namespace, timeout=300):
"""
Wait for all pods to be ready using TimeoutSampler.
Demonstrates how to limit repetitive logs during polling operations.
"""
logger.info(f"Waiting for pods in '{namespace}' to be ready (timeout: {timeout}s)")
try:
# TimeoutSampler automatically rate-limits exception logs to once per minute
# Log only state changes, not every iteration
last_pod_count = None
last_ready_count = None
for pod_list in TimeoutSampler(
timeout=timeout,
sleep=10,
func=get_all_pods,
namespace=namespace
):
total_pods = len(pod_list)
ready_pods = sum(
1 for pod in pod_list
if pod.data["status"]["phase"] == constants.STATUS_RUNNING
)
# Log only when counts change (avoid duplicate logs)
if (total_pods, ready_pods) != (last_pod_count, last_ready_count):
logger.debug(
f"Pod status: {ready_pods}/{total_pods} ready"
)
last_pod_count = total_pods
last_ready_count = ready_pods
# Check condition
if total_pods > 0 and ready_pods == total_pods:
logger.info(f"All {total_pods} pods are ready")
return True
except TimeoutExpiredError:
logger.error(
f"Timeout waiting for pods in '{namespace}' to be ready after {timeout}s"
)
# Log final state for debugging
logger.error(f"Final status: {last_ready_count}/{last_pod_count} pods ready")
raise
Pattern: Conditional Logging
Use DEBUG for verbose details, INFO for summaries:
def process_items(items, verbose=False):
"""Process items with optional verbose logging"""
logger.info(f"Processing {len(items)} items")
for i, item in enumerate(items):
logger.debug(f"Processing item {i+1}/{len(items)}: {item.name}")
result = process_item(item)
if verbose or result.has_issues():
logger.info(f"Item {item.name}: {result.status}")
logger.info("All items processed")
Pattern: Error Context
Provide context in error logs:
def create_resource(config):
"""Create resource with error context"""
try:
logger.info(f"Creating resource: {config['name']}")
resource = Resource(config)
resource.create()
logger.info("Resource created successfully")
return resource
except APIError as e:
logger.error(
f"API error creating resource '{config['name']}': {e}"
)
logger.error(f"Config: {config}")
logger.error(f"API response: {e.response}")
raise
except Exception as e:
logger.error(
f"Unexpected error creating resource '{config['name']}': {e}",
exc_info=True # Include stack trace
)
raise
Pattern: Step Numbering in Loops
Reset step counters for independent iterations:
from ocs_ci.framework.custom_logger import reset_step_counts
def test_multiple_iterations(self):
"""Test with multiple independent iterations"""
for iteration in range(3):
logger.info(f"=== Iteration {iteration + 1}/3 ===")
# Reset step counter for clean numbering
reset_step_counts(__name__)
logger.test_step("Setup iteration resources")
setup_resources()
logger.test_step("Run iteration workload")
run_workload()
logger.test_step("Validate iteration results")
validate_results()
logger.test_step("Cleanup iteration resources")
cleanup_resources()
Anti-Patterns to Avoid
❌ Don’t: Excessive TEST_STEP Usage
# TOO GRANULAR - clutters logs
def test_bad_example(self):
logger.test_step("Import libraries") # ❌
logger.test_step("Define variables") # ❌
logger.test_step("Create PVC object") # ❌
pvc = PVC()
logger.test_step("Set PVC name") # ❌
pvc.name = "test"
✅ Do: Use steps for meaningful phases
def test_good_example(self):
logger.test_step("Create and configure PVC")
pvc = PVC(name="test", size="10Gi")
logger.test_step("Deploy PVC to cluster")
pvc.create()
❌ Don’t: Redundant Messages
# Redundant with function name
def deploy_cluster(self):
logger.test_step("Deploy cluster") # ❌ Adds no info
✅ Do: Add specific details
def deploy_cluster(self):
logger.test_step("Deploy 3-node cluster with ODF storage") # ✅
❌ Don’t: Logging Secrets
# NEVER log credentials
logger.info(f"Using password: {password}") # ❌❌❌
logger.debug(f"API key: {api_key}") # ❌❌❌
logger.info(f"Config: {config}") # ❌ May contain secrets
✅ Do: Sanitize sensitive data
logger.info("Authentication configured") # ✅
logger.debug(f"API endpoint: {endpoint}") # ✅
logger.debug(f"Username: {username}") # ✅ (username OK, not password)
❌ Don’t: String Formatting Waste
# Expensive string formatting even when DEBUG disabled
logger.debug(f"Large object: {expensive_operation()}") # ❌
✅ Do: Use lazy formatting for DEBUG
# Only formats if DEBUG enabled
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Large object: {expensive_operation()}") # ✅
❌ Don’t: Generic Messages
logger.info("Processing") # ❌ What processing?
logger.error("Failed") # ❌ What failed?
logger.warning("Issue") # ❌ What issue?
✅ Do: Be specific
logger.info("Processing 50 PVC creation requests") # ✅
logger.error("Failed to create StorageCluster: API timeout") # ✅
logger.warning("Cluster utilization at 85% - approaching capacity") # ✅
❌ Don’t: Wrong Log Level
# Using INFO for debug details
logger.info(f"Variable x = {x}, y = {y}, z = {z}") # ❌
# Using ERROR for warnings
logger.error("Using default configuration") # ❌
# Using DEBUG for important progress
logger.debug("Cluster deployment completed") # ❌
✅ Do: Use appropriate levels
logger.debug(f"Variable x = {x}, y = {y}, z = {z}") # ✅
logger.warning("Using default configuration") # ✅
logger.info("Cluster deployment completed") # ✅
❌ Don’t: Framing Log Messages
# Using decorative lines to frame log messages
logger.info("=" * 80) # ❌
logger.info("Starting deployment")
logger.info("=" * 80) # ❌
logger.info("#" * 50) # ❌
logger.info("### Important Section ###") # ❌
logger.info("#" * 50) # ❌
logger.info("-" * 60) # ❌
logger.test_step("Run validation")
logger.info("-" * 60) # ❌
Why avoid framing?
Creates visual noise in logs that makes scanning harder
Wastes log lines without adding information
Breaks grep/search patterns
Inconsistent with log aggregation tools
TEST_STEP already provides visual structure
✅ Do: Use log levels and clear messages for structure
# Let TEST_STEP provide structure
logger.test_step("Starting deployment") # ✅
logger.info("Deploying control plane nodes")
logger.info("Deploying worker nodes")
# Use clear, informative messages
logger.info("=== Beginning validation phase ===") # ✅ If you must, include in message
logger.test_step("Run validation") # ✅ Better: use TEST_STEP
# Group related operations with consistent prefixes
logger.info("Validation: Checking cluster health") # ✅
logger.info("Validation: Verifying storage") # ✅
logger.info("Validation: Testing connectivity") # ✅
❌ Don’t: Start Log Messages with Newlines
# Starting log messages with newlines
logger.info("\nStarting deployment") # ❌
logger.error("\n\nFailed to create PVC") # ❌
logger.test_step("\nCreate resources") # ❌
# Multiple newlines for spacing
logger.info("\n\n\n=== Important Section ===") # ❌
Why avoid leading newlines?
Breaks log formatting and timestamp alignment
Creates empty lines in log files that waste space
Disrupts log aggregation and parsing tools
Makes automated log analysis difficult
Inconsistent with standard logging practices
Hard to grep/search for messages
✅ Do: Let the logging framework handle formatting
# Clean, properly formatted messages
logger.info("Starting deployment") # ✅
logger.error("Failed to create PVC") # ✅
logger.test_step("Create resources") # ✅
# Use TEST_STEP for visual separation
logger.test_step("Important Section") # ✅
# Or use clear section markers within messages
logger.info("=== Important Section ===") # ✅ If you must
What happens with leading newlines:
2026-04-22 10:30:15 INFO - Processing resources
2026-04-22 10:30:16 INFO -
Starting deployment # ❌ Timestamp and message separated
2026-04-22 10:30:17 INFO - Deployment in progress
Clean formatting without newlines:
2026-04-22 10:30:15 INFO - Processing resources
2026-04-22 10:30:16 INFO - Starting deployment # ✅ Clean alignment
2026-04-22 10:30:17 INFO - Deployment in progress
Special Topics
Assertions: Before or After?
Best practice: Log assertion BEFORE the assert statement
# ✅ Recommended: Log before assert
logger.assertion(f"PVC status: expected='Bound', actual='{pvc.status}'")
assert pvc.status == "Bound", f"PVC not bound: {pvc.status}"
# ❌ Less useful: Log after assert (might not execute)
assert pvc.status == "Bound", f"PVC not bound: {pvc.status}"
logger.assertion(f"PVC status check passed") # Never logs if assertion fails
Why? Logging before ensures the assertion context is captured even if the assert fails.
Exception Logging
Python’s logging module provides logger.exception() which automatically logs at ERROR level and includes the full traceback. This is the preferred method for logging exceptions.
Using logger.exception()
Best practice: Use logger.exception() in exception handlers
# ✅ Recommended: Use logger.exception() in except blocks
try:
risky_operation()
except Exception as e:
logger.exception(f"Operation failed: {e}")
raise
# Equivalent to:
try:
risky_operation()
except Exception as e:
logger.error(f"Operation failed: {e}", exc_info=True)
raise
Key points:
logger.exception()automatically includes the full tracebackShould only be called from exception handlers (inside
exceptblocks)Logs at ERROR level
Always includes
exc_info=Trueautomatically
Exception Logging Patterns
Pattern 1: Simple exception logging
def create_pvc(name, size):
"""Create PVC with exception handling"""
try:
pvc = PVC(name=name, size=size)
pvc.create()
logger.info(f"PVC '{name}' created successfully")
return pvc
except Exception as e:
logger.exception(f"Failed to create PVC '{name}'")
raise
Pattern 2: Specific exception types
def deploy_cluster():
"""Deploy cluster with specific exception handling"""
try:
initialize_cluster()
deploy_nodes()
configure_storage()
except TimeoutError as e:
logger.exception(f"Timeout during cluster deployment: {e}")
raise
except APIError as e:
logger.exception(f"API error during cluster deployment: {e}")
logger.error(f"API response: {e.response}")
raise
except Exception as e:
logger.exception(f"Unexpected error during cluster deployment: {e}")
raise
Pattern 3: Exception with context (no re-raise)
def cleanup_resources(resource_list):
"""Cleanup resources, continue on individual failures"""
failed = []
for resource in resource_list:
try:
delete_resource(resource)
logger.info(f"Deleted resource: {resource.name}")
except Exception as e:
# Log exception but don't re-raise - continue cleanup
logger.exception(
f"Failed to delete resource '{resource.name}': {e}"
)
failed.append(resource.name)
if failed:
logger.warning(f"Failed to delete {len(failed)} resources: {failed}")
return failed
Pattern 4: DEBUG vs ERROR exception logging
def optional_operation():
"""Operation that may fail without impacting main flow"""
try:
perform_optional_task()
except Exception as e:
# Use DEBUG for non-critical failures
logger.debug(f"Optional operation failed: {e}", exc_info=True)
# Continue execution
When to Use exc_info=True vs logger.exception()
# ✅ In except block: Use logger.exception()
try:
operation()
except Exception as e:
logger.exception("Operation failed") # Preferred
# ✅ Outside except block: Use exc_info=True with logger.error()
def some_function():
if error_condition:
logger.error("Error detected", exc_info=True)
# ✅ For DEBUG level with traceback
try:
operation()
except Exception as e:
logger.debug("Operation failed", exc_info=True) # Can't use .exception() for DEBUG
Complete Exception Handling Example
def test_cluster_upgrade(self):
"""Test cluster upgrade with comprehensive error handling"""
logger.test_step("Start cluster upgrade")
try:
# Pre-upgrade validation
logger.info("Running pre-upgrade validation")
validate_cluster_health()
# Perform upgrade
logger.info("Updating operator subscription")
update_subscription(channel="stable-4.16")
logger.info("Waiting for upgrade completion")
wait_for_upgrade_complete(timeout=3600)
# Post-upgrade validation
logger.info("Running post-upgrade validation")
validate_cluster_health()
logger.info("Cluster upgrade completed successfully")
except TimeoutError as e:
logger.exception(f"Upgrade timed out: {e}")
logger.error("Collecting must-gather for debugging")
collect_must_gather()
raise
except ValidationError as e:
logger.exception(f"Cluster validation failed: {e}")
logger.error(f"Cluster state: {get_cluster_state()}")
raise
except Exception as e:
logger.exception(f"Unexpected error during upgrade: {e}")
logger.error("Attempting to collect diagnostic information")
try:
collect_diagnostics()
except Exception as diag_error:
logger.warning(f"Could not collect diagnostics: {diag_error}")
raise
Output:
ERROR - Failed to create PVC 'test-pvc'
Traceback (most recent call last):
File "test.py", line 123, in create_pvc
pvc.create()
File "pvc.py", line 45, in create
response = api.post(...)
APIError: 500 Internal Server Error
Performance-Sensitive Code
Avoid expensive operations in log messages:
# ❌ Bad: Expensive operation runs even if DEBUG disabled
logger.debug(f"Data: {expensive_serialization(large_object)}")
# ✅ Good: Only runs if DEBUG enabled
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Data: {expensive_serialization(large_object)}")
# ✅ Alternative: Lazy evaluation
logger.debug("Data: %s", lambda: expensive_serialization(large_object))
Multi-line Log Messages
Use proper formatting for readability:
# ✅ Good: Multi-line with clear structure
logger.error(
"Cluster validation failed:\n"
f" Operators ready: {operators_ready}\n"
f" Pods running: {pods_running}\n"
f" Storage healthy: {storage_healthy}\n"
f" Ceph status: {ceph_status}"
)
# ✅ Alternative: Multiple log calls
logger.error("Cluster validation failed:")
logger.error(f" Operators ready: {operators_ready}")
logger.error(f" Pods running: {pods_running}")
logger.error(f" Storage healthy: {storage_healthy}")
Structured Logging Context
Include context for easier log parsing:
# ✅ Good: Structured information
logger.info(
f"PVC operation: action=create, name={name}, size={size}, "
f"storage_class={sc}, namespace={namespace}"
)
# Can be parsed/filtered easily:
# grep "action=create" logs.txt
# grep "storage_class=ocs-storagecluster-ceph-rbd" logs.txt
Testing Your Logging
Verify logging in unit tests:
def test_function_logging(caplog):
"""Test that function logs appropriately"""
with caplog.at_level(logging.INFO):
result = my_function()
# Verify expected log messages
assert "Starting operation" in caplog.text
assert "Operation completed" in caplog.text
# Verify log levels
assert any(
record.levelname == "TEST_STEP" and "Deploy resources" in record.message
for record in caplog.records
)
Quick Reference
When to Use Each Level
Scenario |
Level |
Method |
|---|---|---|
Test phase starts |
TEST_STEP |
|
Assertion check |
ASSERTION |
|
Resource created successfully |
INFO |
|
Operation progress update |
INFO |
|
API request/response details |
DEBUG |
|
Variable values during debugging |
DEBUG |
|
Retry attempt |
WARNING |
|
Deprecated feature used |
WARNING |
|
Operation failed |
ERROR |
|
Exception caught (with traceback) |
ERROR |
|
Test failed |
ERROR |
|
Framework crash |
CRITICAL |
|
ML prediction made |
AI_DATA |
|
Common Commands
# Standard imports
import logging
logger = logging.getLogger(__name__)
# Custom levels
logger.test_step("Major test phase")
logger.assertion("expected='X', actual='Y'")
logger.ai_data("ML prediction: ...")
# Standard levels
logger.info("Operation progress")
logger.debug("Detailed diagnostics")
logger.warning("Potential issue")
logger.error("Operation failed")
logger.critical("System failure")
# Exception logging (use in except blocks)
try:
risky_operation()
except Exception as e:
logger.exception(f"Operation failed: {e}") # Includes traceback
raise
# Reset step counter (for loops)
from ocs_ci.framework.custom_logger import reset_step_counts
reset_step_counts(__name__)
# Check if level enabled (performance)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Expensive: {expensive_call()}")
Summary
Use the right level: TEST_STEP for phases, ASSERTION for validations, INFO for progress, DEBUG for details
Use logger.exception(): Always use in exception handlers to automatically capture tracebacks
Be specific: Include context, values, and outcomes in log messages
Think about the reader: Your logs should tell the story of what happened
Don’t over-log: Too many logs hide important information
Never log secrets: Sanitize sensitive data
Test your logging: Verify logs provide useful debugging information
Following these guidelines helps create logs that are:
Scannable - Easy to find important information
Debuggable - Provide enough context to diagnose issues
Consistent - Follow predictable patterns across the codebase
Actionable - Clear what succeeded, failed, or needs attention
Automation
The /update-logging skill can automatically apply these guidelines to Python files:
/update-logging tests/functional/test_example.py
The skill reads this guide as its authoritative source and applies the patterns documented here.
See .claude/skills/update-logging/SKILL.md for details.