OCS-CI Logging Guide for Contributors

This guide explains how to use logging effectively in the OCS-CI framework. Following these guidelines ensures consistent, readable logs that make debugging easier for everyone.

Table of Contents

  1. Quick Start

  2. Log Level Selection Guide

  3. Usage by Code Type

  4. Common Patterns

  5. Anti-Patterns to Avoid

  6. Special Topics

  7. Quick Reference

  8. Summary


Quick Start

Getting the Logger

All code should use the standard Python logging pattern:

import logging

logger = logging.getLogger(__name__)

The framework automatically provides a custom OCSCILogger with extended log levels.

Important: The logger creation statement should be placed immediately after import statements, before any other code.

Best Practice

# test_example.py
import logging
import pytest
from ocs_ci.framework.testlib import tier1, ManageTest
from ocs_ci.ocs.resources.pvc import create_pvc

logger = logging.getLogger(__name__)  # First line after imports


@tier1
class TestExample(ManageTest):
    """Example test class"""

    def test_something(self):
        logger.info("Test starting")
        # Test code here

Why Use __name__?

Using logging.getLogger(__name__) creates a logger named after the module (e.g., tests.functional.test_pvc or ocs_ci.helpers.helpers). This:

  • Creates a hierarchical logger structure

  • Makes it easy to filter logs by module

  • Helps identify the source of log messages

  • Follows Python logging best practices

What NOT to Do

# ❌ Wrong: Using a custom string
logger = logging.getLogger("my_logger")

# ❌ Wrong: Getting the root logger
logger = logging.getLogger()

# ❌ Wrong: Creating logger inside functions
def my_function():
    logger = logging.getLogger(__name__)  # Creates logger on every call
    logger.info("Message")

# ❌ Wrong: Placing logger creation after other code
import logging
from ocs_ci.framework.testlib import tier1

CONSTANT = "value"

def helper_function():
    return "data"

logger = logging.getLogger(__name__)  # Should be right after imports

Available Log Levels

From highest to lowest severity:

Level

Value

Method

Use For

CRITICAL

50

logger.critical()

System failures, framework crashes

ERROR

40

logger.error()

Test failures, exceptions, errors

WARNING

30

logger.warning()

Deprecations, retries, potential issues

TEST_STEP

25

logger.test_step()

Major test workflow phases

INFO

20

logger.info()

General progress, operations

ASSERTION

15

logger.assertion()

Test validations and assertions

DEBUG

10

logger.debug()

Detailed diagnostic info

AI_DATA

5

logger.ai_data()

ML predictions, metrics, analysis


Log Level Selection Guide

CRITICAL (50) - System Failures

When to use:

  • Framework initialization failures

  • Unrecoverable cluster state

  • Critical resource exhaustion

  • Infrastructure failures requiring immediate attention

Examples:

# Framework initialization
try:
    initialize_framework()
except Exception as e:
    logger.critical(f"Framework initialization failed: {e}")
    raise

# Critical resource state
if not check_cluster_api_accessible():
    logger.critical("Cluster API is unreachable - cannot continue testing")
    sys.exit(1)

# OCP/OCS critical failures
if storage_cluster_state == "Failed":
    logger.critical(
        f"StorageCluster in Failed state: {get_cluster_conditions()}"
    )

Output:

CRITICAL - Framework initialization failed: Cannot load config from /path/to/config
CRITICAL - Cluster API is unreachable - cannot continue testing
CRITICAL - StorageCluster in Failed state: Phase=Failed, Message='Ceph cluster down'

ERROR (40) - Test Failures & Exceptions

When to use:

  • Test failures (caught by pytest)

  • Exceptions during operations

  • Resource creation/deletion failures

  • Validation failures that block progress

Examples:

# Resource creation failure
try:
    pvc = create_pvc(sc_name="ocs-storagecluster-ceph-rbd", size="10Gi")
except Exception as e:
    logger.error(f"Failed to create PVC: {e}")
    raise

# Timeout errors
if not wait_for_resource_state(pvc, "Bound", timeout=300):
    logger.error(
        f"PVC {pvc.name} did not reach Bound state after 300s. "
        f"Current state: {pvc.status.phase}"
    )
    raise TimeoutError("PVC creation timeout")

# Deployment failures
if deployment_failed:
    logger.error(
        f"Cluster deployment failed at stage '{current_stage}': {error_msg}"
    )
    logger.error(f"Deployment logs: {get_deployment_logs()}")

Output:

ERROR - Failed to create PVC: API server returned 500: Internal Server Error
ERROR - PVC test-pvc-rbd did not reach Bound state after 300s. Current state: Pending
ERROR - Cluster deployment failed at stage 'bootstrap': timeout waiting for bootstrap complete

WARNING (30) - Potential Issues

When to use:

  • Deprecated features/parameters

  • Retry attempts

  • Non-critical misconfigurations

  • Performance concerns (not failures)

  • Skipped optional operations

Examples:

# Deprecation warnings
def old_function(param):
    logger.warning(
        "old_function() is deprecated and will be removed in v5.0. "
        "Use new_function() instead."
    )
    return new_function(param)

# Retry attempts
for attempt in range(1, max_retries + 1):
    try:
        return perform_operation()
    except TemporaryError as e:
        logger.warning(
            f"Operation failed (attempt {attempt}/{max_retries}): {e}. "
            f"Retrying in {retry_delay}s..."
        )
        time.sleep(retry_delay)

# Configuration issues
if not config.get("optimal_setting"):
    logger.warning(
        "optimal_setting not configured. Using default value which may "
        "impact performance. Consider setting in your ocsci-conf file."
    )

# Resource constraints
current_memory = get_available_memory()
if current_memory < recommended_memory:
    logger.warning(
        f"Available memory ({current_memory}GB) is below recommended "
        f"({recommended_memory}GB). Tests may run slower."
    )

# Skipped operations
if not managed_service:
    logger.warning("Skipping CephFS replica check on non-managed deployments")
    return

Output:

WARNING - old_function() is deprecated and will be removed in v5.0. Use new_function() instead.
WARNING - Operation failed (attempt 2/3): Connection timeout. Retrying in 5s...
WARNING - optimal_setting not configured. Using default value which may impact performance.
WARNING - Available memory (8GB) is below recommended (16GB). Tests may run slower.

TEST_STEP (25) - Test Workflow Phases

When to use:

  • Major phases in test execution

  • Setup/teardown operations

  • Deployment stages

  • Workflow checkpoints

Key features:

  • Automatic step numbering per logger

  • Helps visualize test flow

  • Makes logs scannable

Examples:

# Test workflow
def test_pvc_snapshot_restore(self):
    """Test PVC snapshot creation and restoration"""

    logger.test_step("Create source PVC and write test data")
    source_pvc = create_pvc(size="5Gi")
    write_test_data(source_pvc, pattern="random", size="1Gi")
    original_checksum = calculate_checksum(source_pvc)

    logger.test_step("Create volume snapshot")
    snapshot = create_snapshot(source_pvc)
    wait_for_snapshot_ready(snapshot, timeout=300)

    logger.test_step("Restore snapshot to new PVC")
    restored_pvc = restore_from_snapshot(snapshot, size="5Gi")
    wait_for_pvc_bound(restored_pvc, timeout=300)

    logger.test_step("Verify data integrity in restored PVC")
    restored_checksum = calculate_checksum(restored_pvc)
    assert original_checksum == restored_checksum, "Data mismatch after restore"

    logger.test_step("Cleanup test resources")
    delete_pvc(restored_pvc)
    delete_snapshot(snapshot)
    delete_pvc(source_pvc)

# Setup/teardown
def setup_method(self):
    """Setup test environment"""

    logger.test_step("Create dedicated test namespace")
    self.namespace = create_namespace(name="test-mcg")

    logger.test_step("Deploy MCG resources")
    deploy_mcg_components(namespace=self.namespace)

    logger.test_step("Configure S3 test credentials")
    self.s3_creds = create_s3_credentials()

def teardown_method(self):
    """Cleanup test environment"""

    logger.test_step("Delete test buckets and objects")
    cleanup_s3_resources(self.s3_creds)

    logger.test_step("Remove MCG resources")
    delete_mcg_components(namespace=self.namespace)

    logger.test_step("Delete test namespace")
    delete_namespace(self.namespace)

# Deployment stages
def deploy_ocp_cluster(self):
    """Deploy OpenShift cluster"""

    logger.test_step("Generate cluster installation manifests")
    generate_manifests(cluster_name=self.cluster_name)

    logger.test_step("Deploy bootstrap node")
    deploy_bootstrap_node()

    logger.test_step("Deploy control plane nodes")
    deploy_control_plane(node_count=3)

    logger.test_step("Wait for bootstrap complete")
    wait_for_bootstrap_complete(timeout=1800)

    logger.test_step("Deploy worker nodes")
    deploy_worker_nodes(node_count=3)

    logger.test_step("Finalize cluster installation")
    finalize_installation()

# Complex multi-phase test
def test_cluster_upgrade(self):
    """Test ODF cluster upgrade from 4.15 to 4.16"""

    logger.test_step("Verify pre-upgrade cluster health")
    verify_cluster_health()
    verify_all_pods_running()

    logger.test_step("Backup current cluster configuration")
    backup = create_cluster_backup()

    logger.test_step("Update operator subscription channel")
    update_subscription(channel="stable-4.16")

    logger.test_step("Monitor operator upgrade progress")
    wait_for_operator_upgrade(timeout=1800)

    logger.test_step("Wait for all pods to be updated")
    wait_for_pod_rollout_complete(timeout=1800)

    logger.test_step("Verify post-upgrade cluster health")
    verify_cluster_health()
    verify_storage_functionality()

    logger.test_step("Run post-upgrade validation suite")
    run_smoke_tests()

Output:

TEST_STEP 1 - Create source PVC and write test data
TEST_STEP 2 - Create volume snapshot
TEST_STEP 3 - Restore snapshot to new PVC
TEST_STEP 4 - Verify data integrity in restored PVC
TEST_STEP 5 - Cleanup test resources

INFO (20) - General Progress

When to use:

  • Successful operations

  • High-level progress milestones (starting, completion, major checkpoints)

  • State changes

  • Resource creation/deletion (successful)

  • Configuration details

  • Most day-to-day logging

Note: Use DEBUG for iteration-level details within loops. INFO should mark the start, completion, or major milestones of an operation, not each iteration.

Examples:

# Resource operations
def create_pvc(name, size, sc_name):
    logger.info(
        f"Creating PVC '{name}': size={size}, storage_class={sc_name}"
    )
    pvc = PVC(name=name, size=size, storage_class=sc_name)
    pvc.create()

    logger.info(f"PVC '{name}' created successfully")
    return pvc

# High-level progress milestones
def wait_for_pods_ready(namespace, timeout=300):
    logger.info(
        f"Waiting for all pods in namespace '{namespace}' to be ready "
        f"(timeout: {timeout}s)"
    )

    start_time = time.time()
    while time.time() - start_time < timeout:
        pods = get_pods(namespace)
        ready_count = sum(1 for p in pods if p.is_ready())

        # Use DEBUG for iteration details
        logger.debug(
            f"Pod readiness check: {ready_count}/{len(pods)} pods ready"
        )

        if ready_count == len(pods):
            logger.info(f"All {len(pods)} pods ready")
            return True

        time.sleep(10)

    logger.warning(
        f"Timeout waiting for pods in '{namespace}': "
        f"{ready_count}/{len(pods)} ready after {timeout}s"
    )
    return False

# Configuration
def setup_cluster_monitoring(config):
    logger.info("Configuring cluster monitoring")
    logger.info(f"Retention period: {config['retention_days']} days")
    logger.info(f"Storage size: {config['storage_size']}")
    logger.info(f"Storage class: {config['storage_class']}")

    apply_monitoring_config(config)
    logger.info("Cluster monitoring configured successfully")

# State changes
def scale_deployment(deployment_name, replicas):
    current_replicas = get_deployment_replicas(deployment_name)
    logger.info(
        f"Scaling deployment '{deployment_name}': "
        f"{current_replicas} -> {replicas} replicas"
    )

    scale(deployment_name, replicas)
    logger.info(f"Deployment '{deployment_name}' scaled successfully")

Output:

INFO - Creating PVC 'test-pvc-rbd': size=10Gi, storage_class=ocs-storagecluster-ceph-rbd
INFO - PVC 'test-pvc-rbd' created successfully
INFO - Waiting for all pods in namespace 'openshift-storage' to be ready (timeout: 300s)
INFO - All 12 pods ready
INFO - Configuring cluster monitoring
INFO - Retention period: 7 days
INFO - Storage size: 100Gi
INFO - Storage class: ocs-storagecluster-ceph-rbd
INFO - Cluster monitoring configured successfully
INFO - Scaling deployment 'rook-ceph-mon': 3 -> 5 replicas
INFO - Deployment 'rook-ceph-mon' scaled successfully

ASSERTION (15) - Test Validations

When to use:

  • Before/after Python assertions

  • Test checkpoint validations

  • Expected vs actual comparisons

  • Pass/fail validations

Examples:

# Basic assertion logging
def test_pvc_creation(self):
    pvc = create_pvc(size="10Gi")

    logger.assertion(
        f"PVC status: expected='Bound', actual='{pvc.status}'"
    )
    assert pvc.status == "Bound", f"PVC not bound: {pvc.status}"

# Numeric comparisons
def test_performance_threshold(self):
    iops = measure_iops()
    min_iops = 1000

    logger.assertion(
        f"IOPS check: measured={iops}, minimum={min_iops}, "
        f"passed={iops >= min_iops}"
    )
    assert iops >= min_iops, f"IOPS below threshold: {iops} < {min_iops}"

# Multiple conditions
def test_cluster_health(self):
    health_checks = {
        "operators_ready": check_operators(),
        "storage_cluster_healthy": check_storage_cluster(),
        "ceph_healthy": check_ceph_health(),
        "all_pods_running": check_all_pods_ready(),
    }

    for check_name, result in health_checks.items():
        logger.assertion(f"{check_name}: {result}")
        assert result, f"Health check failed: {check_name}"

# List/collection validations
def test_pod_count(self):
    pods = get_pods_in_namespace("openshift-storage")
    expected_count = 12

    logger.assertion(
        f"Pod count: expected={expected_count}, actual={len(pods)}, "
        f"match={len(pods) == expected_count}"
    )
    assert len(pods) == expected_count, \
        f"Expected {expected_count} pods, found {len(pods)}"

# Validation with tolerance
def test_capacity_usage(self):
    usage_pct = get_cluster_capacity_usage()
    expected = 75.0
    tolerance = 5.0

    within_tolerance = abs(usage_pct - expected) <= tolerance
    logger.assertion(
        f"Capacity usage: measured={usage_pct}%, expected={expected}%, "
        f"tolerance=±{tolerance}%, passed={within_tolerance}"
    )
    assert within_tolerance, \
        f"Usage {usage_pct}% outside expected range {expected}±{tolerance}%"

Output:

ASSERTION - PVC status: expected='Bound', actual='Bound'
ASSERTION - IOPS check: measured=1250, minimum=1000, passed=True
ASSERTION - operators_ready: True
ASSERTION - storage_cluster_healthy: True
ASSERTION - Pod count: expected=12, actual=12, match=True
ASSERTION - Capacity usage: measured=73.5%, expected=75%, tolerance=±5%, passed=True

DEBUG (10) - Detailed Diagnostics

When to use:

  • Internal function flow

  • Variable values during debugging

  • API request/response details

  • Detailed resource states

  • Iteration-level details within loops (not completion/start)

  • Helper function internals

Note: Use DEBUG for per-iteration updates in loops. Use INFO to mark when the overall operation starts or completes.

Examples:

# Function flow
def create_and_attach_pvc(pod_name, pvc_name, size):
    logger.debug(f"Entering create_and_attach_pvc: pod={pod_name}, pvc={pvc_name}")

    logger.debug(f"Creating PVC with size={size}")
    pvc = create_pvc(pvc_name, size)

    logger.debug(f"PVC object created: {pvc.to_dict()}")

    logger.debug(f"Attaching PVC to pod {pod_name}")
    attach_pvc_to_pod(pod_name, pvc)

    logger.debug("Attachment successful")
    return pvc

# API interactions
def get_storage_cluster():
    logger.debug("Fetching StorageCluster resource")
    logger.debug(f"API endpoint: {API_BASE}/odf.openshift.io/v1/storageclusters")

    response = api_client.get(
        "/apis/odf.openshift.io/v1/storageclusters",
        namespace="openshift-storage"
    )

    logger.debug(f"API response status: {response.status_code}")
    logger.debug(f"API response body: {response.json()}")

    return response.json()

# Variable inspection
def calculate_required_nodes(capacity_gb, node_size_gb):
    logger.debug(f"Calculating required nodes: capacity={capacity_gb}GB, node_size={node_size_gb}GB")

    nodes_needed = math.ceil(capacity_gb / node_size_gb)
    logger.debug(f"Raw calculation: {capacity_gb} / {node_size_gb} = {nodes_needed}")

    # Add replica overhead
    with_replica = nodes_needed * 3
    logger.debug(f"With 3x replica: {with_replica}")

    # Add headroom
    final_count = math.ceil(with_replica * 1.2)
    logger.debug(f"With 20% headroom: {final_count}")

    return final_count

# Iteration-level details (use INFO for start/completion, DEBUG for iterations)
def wait_for_resource(resource_name, desired_state, timeout=300):
    logger.info(
        f"Waiting for resource '{resource_name}' to reach '{desired_state}' "
        f"(timeout: {timeout}s)"
    )

    start_time = time.time()
    iteration = 0

    while time.time() - start_time < timeout:
        iteration += 1
        current_state = get_resource_state(resource_name)

        # DEBUG for each iteration
        logger.debug(
            f"Wait iteration {iteration}: current_state={current_state}, "
            f"elapsed={int(time.time() - start_time)}s"
        )

        if current_state == desired_state:
            logger.info(
                f"Resource '{resource_name}' reached '{desired_state}' "
                f"after {int(time.time() - start_time)}s"
            )
            return True

        time.sleep(5)

    logger.warning(
        f"Timeout waiting for resource '{resource_name}': "
        f"expected='{desired_state}', actual='{current_state}', timeout={timeout}s"
    )
    return False

# Detailed resource state
def verify_storage_cluster():
    sc = get_storage_cluster()

    logger.debug(f"StorageCluster phase: {sc.status.phase}")
    logger.debug(f"StorageCluster conditions: {sc.status.conditions}")
    logger.debug(f"OSD count: {len(sc.status.osd_deployment_status)}")

    for osd in sc.status.osd_deployment_status:
        logger.debug(f"OSD {osd.name}: state={osd.state}, node={osd.node}")

Output:

DEBUG - Entering create_and_attach_pvc: pod=test-pod, pvc=test-pvc
DEBUG - Creating PVC with size=10Gi
DEBUG - PVC object created: {'kind': 'PersistentVolumeClaim', 'metadata': {...}}
DEBUG - Fetching StorageCluster resource
DEBUG - API response status: 200
DEBUG - Wait iteration 5: current_state=Pending, elapsed=25s

AI_DATA (5) - ML/AI Information

When to use:

  • Machine learning predictions

  • AI-generated insights

  • Model inference details

  • Anomaly detection results

  • Performance predictions

  • Training/inference metrics

Note: This level is below DEBUG and requires explicit enabling. Only use for AI/ML features.

Examples:

# Failure prediction
def predict_disk_failure(disk_id):
    features = extract_disk_features(disk_id)

    logger.ai_data(f"Model input features: {features}")

    prediction = ml_model.predict(features)
    probability = prediction['failure_probability']

    logger.ai_data(
        f"Disk failure prediction: disk={disk_id}, "
        f"probability={probability:.2%}, model=disk_health_v2"
    )

    if probability > 0.8:
        logger.warning(
            f"High disk failure probability detected: {disk_id} ({probability:.2%})"
        )

    return prediction

# Anomaly detection
def detect_performance_anomaly(metrics):
    baseline = get_baseline_metrics()
    z_score = calculate_z_score(metrics, baseline)

    logger.ai_data(
        f"Performance metrics: current={metrics}, baseline={baseline}, "
        f"z_score={z_score:.2f}"
    )

    if z_score > 2.0:
        logger.ai_data(
            f"Anomaly detected: z_score={z_score:.2f} exceeds threshold=2.0"
        )
        logger.warning(
            f"Performance anomaly: latency {z_score:.1f}σ above baseline"
        )

# Cluster scaling recommendation
def recommend_cluster_scale():
    current_utilization = get_cluster_utilization()
    predicted_peak = predict_peak_utilization()

    logger.ai_data(
        f"Utilization analysis: current={current_utilization:.1%}, "
        f"predicted_peak={predicted_peak:.1%}"
    )

    if predicted_peak > 0.9:
        recommendation = calculate_node_recommendation()
        logger.ai_data(
            f"Scale recommendation: add {recommendation['nodes']} nodes "
            f"(confidence={recommendation['confidence']:.2%})"
        )
        logger.info(
            f"Cluster scale recommended: add {recommendation['nodes']} nodes "
            f"to handle predicted load"
        )

# Test runtime prediction
def predict_test_runtime(test_name):
    historical_data = get_test_history(test_name)

    logger.ai_data(f"Historical runtimes: {historical_data}")

    prediction = time_series_model.predict(test_name)

    logger.ai_data(
        f"Test runtime prediction: test={test_name}, "
        f"predicted={prediction['duration']}min, "
        f"confidence_interval=±{prediction['std']}min, "
        f"model=runtime_predictor_v1"
    )

    return prediction

Output:

AI_DATA - Disk failure prediction: disk=sda, probability=85.3%, model=disk_health_v2
AI_DATA - Performance metrics: current={'latency': 150}, baseline={'latency': 50}, z_score=3.20
AI_DATA - Anomaly detected: z_score=3.20 exceeds threshold=2.0
AI_DATA - Scale recommendation: add 2 nodes (confidence=94.2%)

Usage by Code Type

Test Files (tests/)

Tests should use TEST_STEP and ASSERTION liberally to make test flow clear:

import logging
import pytest
from ocs_ci.framework.testlib import tier1, ManageTest
from ocs_ci.ocs.resources.pvc import create_pvc, delete_pvc

logger = logging.getLogger(__name__)


@tier1
@pytest.mark.polarion_id("OCS-1234")
class TestPVCLifecycle(ManageTest):
    """Test PVC lifecycle operations"""

    def test_pvc_create_delete(self):
        """
        Test PVC creation and deletion

        Steps:
        1. Create PVC
        2. Verify PVC is bound
        3. Delete PVC
        4. Verify PVC is removed
        """
        pvc_name = "test-pvc-lifecycle"

        logger.test_step("Create PVC with RBD storage class")
        pvc = create_pvc(
            name=pvc_name,
            size="10Gi",
            storage_class="ocs-storagecluster-ceph-rbd"
        )

        logger.test_step("Verify PVC reaches Bound state")
        pvc.reload()
        logger.assertion(
            f"PVC status: expected='Bound', actual='{pvc.status}'"
        )
        assert pvc.status == "Bound", f"PVC not bound: {pvc.status}"

        logger.test_step("Delete PVC")
        delete_pvc(pvc)

        logger.test_step("Verify PVC is removed from cluster")
        deleted = verify_pvc_deleted(pvc_name, timeout=60)
        logger.assertion(
            f"PVC deletion: pvc={pvc_name}, deleted={deleted}"
        )
        assert deleted, f"PVC {pvc_name} still exists after deletion"

Deployment Code (ocs_ci/deployment/)

Deployment code should use TEST_STEP for major phases, INFO for progress:

import logging
from ocs_ci.deployment.helpers import create_manifest, run_installer

logger = logging.getLogger(__name__)


class OCPDeployment:
    """OpenShift cluster deployment"""

    def deploy(self):
        """Deploy OpenShift cluster"""

        logger.test_step("Validate deployment prerequisites")
        self._validate_prerequisites()

        logger.test_step("Generate installation manifests")
        manifest_dir = self._generate_manifests()
        logger.info(f"Manifests generated in: {manifest_dir}")

        logger.test_step("Deploy infrastructure")
        self._deploy_infrastructure()

        logger.test_step("Bootstrap cluster")
        self._bootstrap_cluster()

        logger.test_step("Complete installation")
        self._complete_installation()

        logger.info("Cluster deployment completed successfully")

    def _validate_prerequisites(self):
        """Validate prerequisites"""
        logger.info("Checking pull secret")
        if not self.pull_secret_exists():
            logger.error("Pull secret not found")
            raise FileNotFoundError("Missing pull secret")

        logger.info("Checking cloud credentials")
        if not self.credentials_valid():
            logger.error("Cloud credentials invalid or missing")
            raise ValueError("Invalid credentials")

        logger.debug(f"Using region: {self.region}")
        logger.debug(f"Using instance type: {self.instance_type}")

    def _deploy_infrastructure(self):
        """Deploy infrastructure resources"""
        logger.info("Creating VPC")
        self.vpc_id = self.cloud_provider.create_vpc()
        logger.info(f"VPC created: {self.vpc_id}")

        logger.info("Creating subnets")
        self.subnets = self.cloud_provider.create_subnets(self.vpc_id)
        logger.info(f"Created {len(self.subnets)} subnets")

        logger.debug(f"Subnet IDs: {[s.id for s in self.subnets]}")

Helper Functions (ocs_ci/helpers/)

Helpers should use INFO for operations, DEBUG for details:

import logging
from ocs_ci.ocs.resources.pod import get_pods
from ocs_ci.ocs import constants

logger = logging.getLogger(__name__)


def wait_for_pods_ready(namespace, pod_count=None, timeout=300):
    """
    Wait for pods to be ready in namespace

    Args:
        namespace (str): Namespace name
        pod_count (int): Expected pod count (optional)
        timeout (int): Timeout in seconds

    Returns:
        bool: True if all pods ready, False otherwise
    """
    logger.info(
        f"Waiting for pods in namespace '{namespace}' (timeout: {timeout}s)"
    )

    if pod_count:
        logger.debug(f"Expecting {pod_count} pods")

    start_time = time.time()
    iteration = 0

    while time.time() - start_time < timeout:
        iteration += 1
        pods = get_pods(namespace)

        ready_count = sum(1 for p in pods if p.is_ready())
        logger.debug(
            f"Iteration {iteration}: {ready_count}/{len(pods)} pods ready"
        )

        if pod_count and len(pods) != pod_count:
            logger.debug(
                f"Pod count mismatch: expected={pod_count}, actual={len(pods)}"
            )
            time.sleep(5)
            continue

        if ready_count == len(pods):
            elapsed = int(time.time() - start_time)
            logger.info(f"All {len(pods)} pods ready after {elapsed}s")
            return True

        time.sleep(5)

    logger.warning(
        f"Timeout waiting for pods in '{namespace}': "
        f"{ready_count}/{len(pods)} ready after {timeout}s"
    )
    return False


def get_cluster_capacity():
    """
    Get cluster storage capacity information

    Returns:
        dict: Capacity information
    """
    logger.debug("Fetching cluster capacity")

    storage_cluster = get_storage_cluster()
    logger.debug(f"StorageCluster name: {storage_cluster.name}")

    capacity_info = {
        'total': storage_cluster.status.capacity.total,
        'used': storage_cluster.status.capacity.used,
        'available': storage_cluster.status.capacity.available,
    }

    logger.debug(f"Capacity: {capacity_info}")

    usage_pct = (capacity_info['used'] / capacity_info['total']) * 100
    logger.info(f"Cluster capacity usage: {usage_pct:.1f}%")

    return capacity_info

Utilities (ocs_ci/utility/)

Utilities should focus on INFO for user-facing operations:

import logging
import sys
from ocs_ci.framework import config
from ocs_ci.ocs.resources.storage_cluster import get_storage_cluster

logger = logging.getLogger(__name__)


def report_cluster_version():
    """
    Report OCS/ODF cluster version information

    Command-line utility to display cluster versions
    """
    logger.info("Collecting cluster version information...")

    try:
        storage_cluster = get_storage_cluster()
    except Exception as e:
        logger.error(f"Failed to get StorageCluster: {e}")
        sys.exit(1)

    logger.debug(f"StorageCluster object: {storage_cluster.name}")

    version_info = {
        'ODF': storage_cluster.get_odf_version(),
        'Ceph': storage_cluster.get_ceph_version(),
        'NooBaa': storage_cluster.get_noobaa_version(),
    }

    logger.info("=== Cluster Version Information ===")
    for component, version in version_info.items():
        logger.info(f"{component:15s}: {version}")

    logger.debug("Version reporting complete")


def cleanup_test_resources(namespace):
    """
    Cleanup test resources in namespace

    Args:
        namespace (str): Namespace to clean
    """
    logger.info(f"Starting cleanup of namespace: {namespace}")

    logger.info("Deleting PVCs...")
    pvcs = get_all_pvcs(namespace)
    logger.info(f"Found {len(pvcs)} PVCs")

    for pvc in pvcs:
        logger.debug(f"Deleting PVC: {pvc.name}")
        try:
            delete_pvc(pvc)
        except Exception as e:
            logger.warning(f"Failed to delete PVC {pvc.name}: {e}")

    logger.info("Deleting pods...")
    pods = get_all_pods(namespace)
    logger.info(f"Found {len(pods)} pods")

    for pod in pods:
        logger.debug(f"Deleting pod: {pod.name}")
        try:
            delete_pod(pod)
        except Exception as e:
            logger.warning(f"Failed to delete pod {pod.name}: {e}")

    logger.info(f"Cleanup of namespace '{namespace}' completed")

Resource Classes (ocs_ci/ocs/resources/)

Resource classes should use DEBUG extensively, INFO for state changes:

import logging
from ocs_ci.ocs import constants
from ocs_ci.ocs.ocp import OCP

logger = logging.getLogger(__name__)


class PVC:
    """Represents a PersistentVolumeClaim"""

    def __init__(self, name, namespace, size, storage_class):
        self.name = name
        self.namespace = namespace
        self.size = size
        self.storage_class = storage_class

        logger.debug(
            f"PVC object initialized: name={name}, namespace={namespace}, "
            f"size={size}, sc={storage_class}"
        )

    def create(self):
        """Create the PVC"""
        logger.info(
            f"Creating PVC '{self.name}' in namespace '{self.namespace}'"
        )

        manifest = self._generate_manifest()
        logger.debug(f"PVC manifest: {manifest}")

        ocp = OCP(kind=constants.PVC, namespace=self.namespace)
        ocp.create(manifest)

        logger.info(f"PVC '{self.name}' created")

    def reload(self):
        """Reload PVC state from cluster"""
        logger.debug(f"Reloading PVC state: {self.name}")

        ocp = OCP(kind=constants.PVC, namespace=self.namespace)
        data = ocp.get(resource_name=self.name)

        self.status = data['status']['phase']
        logger.debug(f"PVC {self.name} status: {self.status}")

    def wait_for_bound(self, timeout=300):
        """Wait for PVC to reach Bound state"""
        logger.info(
            f"Waiting for PVC '{self.name}' to be bound (timeout: {timeout}s)"
        )

        start_time = time.time()
        while time.time() - start_time < timeout:
            self.reload()
            logger.debug(f"Current status: {self.status}")

            if self.status == constants.STATUS_BOUND:
                elapsed = int(time.time() - start_time)
                logger.info(
                    f"PVC '{self.name}' bound after {elapsed}s"
                )
                return True

            time.sleep(5)

        logger.error(
            f"PVC '{self.name}' did not reach Bound state after {timeout}s"
        )
        return False

Common Patterns

Pattern: Progressive Logging

Use different levels as operations progress:

def complex_operation():
    """Example of progressive logging"""

    # High-level phase
    logger.test_step("Execute complex operation")

    # Operation details
    logger.info("Starting sub-operation A")
    logger.debug("Sub-operation A parameters: {...}")

    result_a = sub_operation_a()

    # Validation
    logger.assertion(f"Sub-operation A result: {result_a}")
    assert result_a == expected, "Operation A failed"

    # Continue
    logger.info("Starting sub-operation B")
    result_b = sub_operation_b()

    logger.info("Complex operation completed successfully")

Pattern: Retry Logging

Log retries clearly:

def retry_operation(max_attempts=3):
    """Operation with retry logic"""

    for attempt in range(1, max_attempts + 1):
        logger.debug(f"Attempt {attempt}/{max_attempts}")

        try:
            result = perform_operation()
            logger.info(f"Operation succeeded on attempt {attempt}")
            return result

        except TemporaryError as e:
            if attempt < max_attempts:
                logger.warning(
                    f"Operation failed (attempt {attempt}/{max_attempts}): {e}. "
                    f"Retrying in 5s..."
                )
                time.sleep(5)
            else:
                logger.error(
                    f"Operation failed after {max_attempts} attempts: {e}"
                )
                raise

Pattern: Iteration Logging

Use INFO for start/completion, DEBUG for iteration details:

def wait_for_condition(condition_func, timeout=300):
    """Wait for condition with proper iteration logging"""

    logger.info(f"Waiting for condition (timeout: {timeout}s)")

    start_time = time.time()
    iteration = 0

    while time.time() - start_time < timeout:
        iteration += 1

        # DEBUG for each iteration
        logger.debug(f"Checking condition (iteration {iteration})")

        if condition_func():
            elapsed = int(time.time() - start_time)
            logger.info(f"Condition met after {elapsed}s")
            return True

        time.sleep(5)

    logger.warning(f"Condition not met after {timeout}s")
    return False

Pattern: TimeoutSampler Logging

Use TimeoutSampler for polling operations while avoiding repetitive logs:

from ocs_ci.utility.utils import TimeoutSampler
from ocs_ci.ocs.resources.pod import get_all_pods
from ocs_ci.ocs import constants
from ocs_ci.ocs.exceptions import TimeoutExpiredError

def wait_for_pods_ready(namespace, timeout=300):
    """
    Wait for all pods to be ready using TimeoutSampler.

    Demonstrates how to limit repetitive logs during polling operations.
    """
    logger.info(f"Waiting for pods in '{namespace}' to be ready (timeout: {timeout}s)")

    try:
        # TimeoutSampler automatically rate-limits exception logs to once per minute
        # Log only state changes, not every iteration
        last_pod_count = None
        last_ready_count = None

        for pod_list in TimeoutSampler(
            timeout=timeout,
            sleep=10,
            func=get_all_pods,
            namespace=namespace
        ):
            total_pods = len(pod_list)
            ready_pods = sum(
                1 for pod in pod_list
                if pod.data["status"]["phase"] == constants.STATUS_RUNNING
            )

            # Log only when counts change (avoid duplicate logs)
            if (total_pods, ready_pods) != (last_pod_count, last_ready_count):
                logger.debug(
                    f"Pod status: {ready_pods}/{total_pods} ready"
                )
                last_pod_count = total_pods
                last_ready_count = ready_pods

            # Check condition
            if total_pods > 0 and ready_pods == total_pods:
                logger.info(f"All {total_pods} pods are ready")
                return True

    except TimeoutExpiredError:
        logger.error(
            f"Timeout waiting for pods in '{namespace}' to be ready after {timeout}s"
        )
        # Log final state for debugging
        logger.error(f"Final status: {last_ready_count}/{last_pod_count} pods ready")
        raise

Pattern: Conditional Logging

Use DEBUG for verbose details, INFO for summaries:

def process_items(items, verbose=False):
    """Process items with optional verbose logging"""

    logger.info(f"Processing {len(items)} items")

    for i, item in enumerate(items):
        logger.debug(f"Processing item {i+1}/{len(items)}: {item.name}")

        result = process_item(item)

        if verbose or result.has_issues():
            logger.info(f"Item {item.name}: {result.status}")

    logger.info("All items processed")

Pattern: Error Context

Provide context in error logs:

def create_resource(config):
    """Create resource with error context"""

    try:
        logger.info(f"Creating resource: {config['name']}")
        resource = Resource(config)
        resource.create()
        logger.info("Resource created successfully")
        return resource

    except APIError as e:
        logger.error(
            f"API error creating resource '{config['name']}': {e}"
        )
        logger.error(f"Config: {config}")
        logger.error(f"API response: {e.response}")
        raise

    except Exception as e:
        logger.error(
            f"Unexpected error creating resource '{config['name']}': {e}",
            exc_info=True  # Include stack trace
        )
        raise

Pattern: Step Numbering in Loops

Reset step counters for independent iterations:

from ocs_ci.framework.custom_logger import reset_step_counts

def test_multiple_iterations(self):
    """Test with multiple independent iterations"""

    for iteration in range(3):
        logger.info(f"=== Iteration {iteration + 1}/3 ===")

        # Reset step counter for clean numbering
        reset_step_counts(__name__)

        logger.test_step("Setup iteration resources")
        setup_resources()

        logger.test_step("Run iteration workload")
        run_workload()

        logger.test_step("Validate iteration results")
        validate_results()

        logger.test_step("Cleanup iteration resources")
        cleanup_resources()

Anti-Patterns to Avoid

❌ Don’t: Excessive TEST_STEP Usage

# TOO GRANULAR - clutters logs
def test_bad_example(self):
    logger.test_step("Import libraries")  # ❌
    logger.test_step("Define variables")  # ❌
    logger.test_step("Create PVC object")  # ❌
    pvc = PVC()
    logger.test_step("Set PVC name")  # ❌
    pvc.name = "test"

Do: Use steps for meaningful phases

def test_good_example(self):
    logger.test_step("Create and configure PVC")
    pvc = PVC(name="test", size="10Gi")

    logger.test_step("Deploy PVC to cluster")
    pvc.create()

❌ Don’t: Redundant Messages

# Redundant with function name
def deploy_cluster(self):
    logger.test_step("Deploy cluster")  # ❌ Adds no info

Do: Add specific details

def deploy_cluster(self):
    logger.test_step("Deploy 3-node cluster with ODF storage")  # ✅

❌ Don’t: Logging Secrets

# NEVER log credentials
logger.info(f"Using password: {password}")  # ❌❌❌
logger.debug(f"API key: {api_key}")  # ❌❌❌
logger.info(f"Config: {config}")  # ❌ May contain secrets

Do: Sanitize sensitive data

logger.info("Authentication configured")  # ✅
logger.debug(f"API endpoint: {endpoint}")  # ✅
logger.debug(f"Username: {username}")  # ✅ (username OK, not password)

❌ Don’t: String Formatting Waste

# Expensive string formatting even when DEBUG disabled
logger.debug(f"Large object: {expensive_operation()}")  # ❌

Do: Use lazy formatting for DEBUG

# Only formats if DEBUG enabled
if logger.isEnabledFor(logging.DEBUG):
    logger.debug(f"Large object: {expensive_operation()}")  # ✅

❌ Don’t: Generic Messages

logger.info("Processing")  # ❌ What processing?
logger.error("Failed")  # ❌ What failed?
logger.warning("Issue")  # ❌ What issue?

Do: Be specific

logger.info("Processing 50 PVC creation requests")  # ✅
logger.error("Failed to create StorageCluster: API timeout")  # ✅
logger.warning("Cluster utilization at 85% - approaching capacity")  # ✅

❌ Don’t: Wrong Log Level

# Using INFO for debug details
logger.info(f"Variable x = {x}, y = {y}, z = {z}")  # ❌

# Using ERROR for warnings
logger.error("Using default configuration")  # ❌

# Using DEBUG for important progress
logger.debug("Cluster deployment completed")  # ❌

Do: Use appropriate levels

logger.debug(f"Variable x = {x}, y = {y}, z = {z}")  # ✅
logger.warning("Using default configuration")  # ✅
logger.info("Cluster deployment completed")  # ✅

❌ Don’t: Framing Log Messages

# Using decorative lines to frame log messages
logger.info("=" * 80)  # ❌
logger.info("Starting deployment")
logger.info("=" * 80)  # ❌

logger.info("#" * 50)  # ❌
logger.info("### Important Section ###")  # ❌
logger.info("#" * 50)  # ❌

logger.info("-" * 60)  # ❌
logger.test_step("Run validation")
logger.info("-" * 60)  # ❌

Why avoid framing?

  • Creates visual noise in logs that makes scanning harder

  • Wastes log lines without adding information

  • Breaks grep/search patterns

  • Inconsistent with log aggregation tools

  • TEST_STEP already provides visual structure

Do: Use log levels and clear messages for structure

# Let TEST_STEP provide structure
logger.test_step("Starting deployment")  # ✅
logger.info("Deploying control plane nodes")
logger.info("Deploying worker nodes")

# Use clear, informative messages
logger.info("=== Beginning validation phase ===")  # ✅ If you must, include in message
logger.test_step("Run validation")  # ✅ Better: use TEST_STEP

# Group related operations with consistent prefixes
logger.info("Validation: Checking cluster health")  # ✅
logger.info("Validation: Verifying storage")  # ✅
logger.info("Validation: Testing connectivity")  # ✅

❌ Don’t: Start Log Messages with Newlines

# Starting log messages with newlines
logger.info("\nStarting deployment")  # ❌
logger.error("\n\nFailed to create PVC")  # ❌
logger.test_step("\nCreate resources")  # ❌

# Multiple newlines for spacing
logger.info("\n\n\n=== Important Section ===")  # ❌

Why avoid leading newlines?

  • Breaks log formatting and timestamp alignment

  • Creates empty lines in log files that waste space

  • Disrupts log aggregation and parsing tools

  • Makes automated log analysis difficult

  • Inconsistent with standard logging practices

  • Hard to grep/search for messages

Do: Let the logging framework handle formatting

# Clean, properly formatted messages
logger.info("Starting deployment")  # ✅
logger.error("Failed to create PVC")  # ✅
logger.test_step("Create resources")  # ✅

# Use TEST_STEP for visual separation
logger.test_step("Important Section")  # ✅

# Or use clear section markers within messages
logger.info("=== Important Section ===")  # ✅ If you must

What happens with leading newlines:

2026-04-22 10:30:15 INFO - Processing resources
2026-04-22 10:30:16 INFO -
Starting deployment  # ❌ Timestamp and message separated
2026-04-22 10:30:17 INFO - Deployment in progress

Clean formatting without newlines:

2026-04-22 10:30:15 INFO - Processing resources
2026-04-22 10:30:16 INFO - Starting deployment  # ✅ Clean alignment
2026-04-22 10:30:17 INFO - Deployment in progress

Special Topics

Assertions: Before or After?

Best practice: Log assertion BEFORE the assert statement

# ✅ Recommended: Log before assert
logger.assertion(f"PVC status: expected='Bound', actual='{pvc.status}'")
assert pvc.status == "Bound", f"PVC not bound: {pvc.status}"

# ❌ Less useful: Log after assert (might not execute)
assert pvc.status == "Bound", f"PVC not bound: {pvc.status}"
logger.assertion(f"PVC status check passed")  # Never logs if assertion fails

Why? Logging before ensures the assertion context is captured even if the assert fails.

Exception Logging

Python’s logging module provides logger.exception() which automatically logs at ERROR level and includes the full traceback. This is the preferred method for logging exceptions.

Using logger.exception()

Best practice: Use logger.exception() in exception handlers

# ✅ Recommended: Use logger.exception() in except blocks
try:
    risky_operation()
except Exception as e:
    logger.exception(f"Operation failed: {e}")
    raise

# Equivalent to:
try:
    risky_operation()
except Exception as e:
    logger.error(f"Operation failed: {e}", exc_info=True)
    raise

Key points:

  • logger.exception() automatically includes the full traceback

  • Should only be called from exception handlers (inside except blocks)

  • Logs at ERROR level

  • Always includes exc_info=True automatically

Exception Logging Patterns

Pattern 1: Simple exception logging

def create_pvc(name, size):
    """Create PVC with exception handling"""
    try:
        pvc = PVC(name=name, size=size)
        pvc.create()
        logger.info(f"PVC '{name}' created successfully")
        return pvc
    except Exception as e:
        logger.exception(f"Failed to create PVC '{name}'")
        raise

Pattern 2: Specific exception types

def deploy_cluster():
    """Deploy cluster with specific exception handling"""
    try:
        initialize_cluster()
        deploy_nodes()
        configure_storage()
    except TimeoutError as e:
        logger.exception(f"Timeout during cluster deployment: {e}")
        raise
    except APIError as e:
        logger.exception(f"API error during cluster deployment: {e}")
        logger.error(f"API response: {e.response}")
        raise
    except Exception as e:
        logger.exception(f"Unexpected error during cluster deployment: {e}")
        raise

Pattern 3: Exception with context (no re-raise)

def cleanup_resources(resource_list):
    """Cleanup resources, continue on individual failures"""
    failed = []

    for resource in resource_list:
        try:
            delete_resource(resource)
            logger.info(f"Deleted resource: {resource.name}")
        except Exception as e:
            # Log exception but don't re-raise - continue cleanup
            logger.exception(
                f"Failed to delete resource '{resource.name}': {e}"
            )
            failed.append(resource.name)

    if failed:
        logger.warning(f"Failed to delete {len(failed)} resources: {failed}")

    return failed

Pattern 4: DEBUG vs ERROR exception logging

def optional_operation():
    """Operation that may fail without impacting main flow"""
    try:
        perform_optional_task()
    except Exception as e:
        # Use DEBUG for non-critical failures
        logger.debug(f"Optional operation failed: {e}", exc_info=True)
        # Continue execution

When to Use exc_info=True vs logger.exception()

# ✅ In except block: Use logger.exception()
try:
    operation()
except Exception as e:
    logger.exception("Operation failed")  # Preferred

# ✅ Outside except block: Use exc_info=True with logger.error()
def some_function():
    if error_condition:
        logger.error("Error detected", exc_info=True)

# ✅ For DEBUG level with traceback
try:
    operation()
except Exception as e:
    logger.debug("Operation failed", exc_info=True)  # Can't use .exception() for DEBUG

Complete Exception Handling Example

def test_cluster_upgrade(self):
    """Test cluster upgrade with comprehensive error handling"""

    logger.test_step("Start cluster upgrade")

    try:
        # Pre-upgrade validation
        logger.info("Running pre-upgrade validation")
        validate_cluster_health()

        # Perform upgrade
        logger.info("Updating operator subscription")
        update_subscription(channel="stable-4.16")

        logger.info("Waiting for upgrade completion")
        wait_for_upgrade_complete(timeout=3600)

        # Post-upgrade validation
        logger.info("Running post-upgrade validation")
        validate_cluster_health()

        logger.info("Cluster upgrade completed successfully")

    except TimeoutError as e:
        logger.exception(f"Upgrade timed out: {e}")
        logger.error("Collecting must-gather for debugging")
        collect_must_gather()
        raise

    except ValidationError as e:
        logger.exception(f"Cluster validation failed: {e}")
        logger.error(f"Cluster state: {get_cluster_state()}")
        raise

    except Exception as e:
        logger.exception(f"Unexpected error during upgrade: {e}")
        logger.error("Attempting to collect diagnostic information")
        try:
            collect_diagnostics()
        except Exception as diag_error:
            logger.warning(f"Could not collect diagnostics: {diag_error}")
        raise

Output:

ERROR - Failed to create PVC 'test-pvc'
Traceback (most recent call last):
  File "test.py", line 123, in create_pvc
    pvc.create()
  File "pvc.py", line 45, in create
    response = api.post(...)
APIError: 500 Internal Server Error

Performance-Sensitive Code

Avoid expensive operations in log messages:

# ❌ Bad: Expensive operation runs even if DEBUG disabled
logger.debug(f"Data: {expensive_serialization(large_object)}")

# ✅ Good: Only runs if DEBUG enabled
if logger.isEnabledFor(logging.DEBUG):
    logger.debug(f"Data: {expensive_serialization(large_object)}")

# ✅ Alternative: Lazy evaluation
logger.debug("Data: %s", lambda: expensive_serialization(large_object))

Multi-line Log Messages

Use proper formatting for readability:

# ✅ Good: Multi-line with clear structure
logger.error(
    "Cluster validation failed:\n"
    f"  Operators ready: {operators_ready}\n"
    f"  Pods running: {pods_running}\n"
    f"  Storage healthy: {storage_healthy}\n"
    f"  Ceph status: {ceph_status}"
)

# ✅ Alternative: Multiple log calls
logger.error("Cluster validation failed:")
logger.error(f"  Operators ready: {operators_ready}")
logger.error(f"  Pods running: {pods_running}")
logger.error(f"  Storage healthy: {storage_healthy}")

Structured Logging Context

Include context for easier log parsing:

# ✅ Good: Structured information
logger.info(
    f"PVC operation: action=create, name={name}, size={size}, "
    f"storage_class={sc}, namespace={namespace}"
)

# Can be parsed/filtered easily:
# grep "action=create" logs.txt
# grep "storage_class=ocs-storagecluster-ceph-rbd" logs.txt

Testing Your Logging

Verify logging in unit tests:

def test_function_logging(caplog):
    """Test that function logs appropriately"""

    with caplog.at_level(logging.INFO):
        result = my_function()

    # Verify expected log messages
    assert "Starting operation" in caplog.text
    assert "Operation completed" in caplog.text

    # Verify log levels
    assert any(
        record.levelname == "TEST_STEP" and "Deploy resources" in record.message
        for record in caplog.records
    )

Quick Reference

When to Use Each Level

Scenario

Level

Method

Test phase starts

TEST_STEP

logger.test_step()

Assertion check

ASSERTION

logger.assertion()

Resource created successfully

INFO

logger.info()

Operation progress update

INFO

logger.info()

API request/response details

DEBUG

logger.debug()

Variable values during debugging

DEBUG

logger.debug()

Retry attempt

WARNING

logger.warning()

Deprecated feature used

WARNING

logger.warning()

Operation failed

ERROR

logger.error()

Exception caught (with traceback)

ERROR

logger.exception()

Test failed

ERROR

logger.error()

Framework crash

CRITICAL

logger.critical()

ML prediction made

AI_DATA

logger.ai_data()

Common Commands

# Standard imports
import logging
logger = logging.getLogger(__name__)

# Custom levels
logger.test_step("Major test phase")
logger.assertion("expected='X', actual='Y'")
logger.ai_data("ML prediction: ...")

# Standard levels
logger.info("Operation progress")
logger.debug("Detailed diagnostics")
logger.warning("Potential issue")
logger.error("Operation failed")
logger.critical("System failure")

# Exception logging (use in except blocks)
try:
    risky_operation()
except Exception as e:
    logger.exception(f"Operation failed: {e}")  # Includes traceback
    raise

# Reset step counter (for loops)
from ocs_ci.framework.custom_logger import reset_step_counts
reset_step_counts(__name__)

# Check if level enabled (performance)
if logger.isEnabledFor(logging.DEBUG):
    logger.debug(f"Expensive: {expensive_call()}")

Summary

  1. Use the right level: TEST_STEP for phases, ASSERTION for validations, INFO for progress, DEBUG for details

  2. Use logger.exception(): Always use in exception handlers to automatically capture tracebacks

  3. Be specific: Include context, values, and outcomes in log messages

  4. Think about the reader: Your logs should tell the story of what happened

  5. Don’t over-log: Too many logs hide important information

  6. Never log secrets: Sanitize sensitive data

  7. Test your logging: Verify logs provide useful debugging information

Following these guidelines helps create logs that are:

  • Scannable - Easy to find important information

  • Debuggable - Provide enough context to diagnose issues

  • Consistent - Follow predictable patterns across the codebase

  • Actionable - Clear what succeeded, failed, or needs attention


Automation

The /update-logging skill can automatically apply these guidelines to Python files:

/update-logging tests/functional/test_example.py

The skill reads this guide as its authoritative source and applies the patterns documented here. See .claude/skills/update-logging/SKILL.md for details.