Codebase list cinder-tempest-plugin / 342caf9 cinder_tempest_plugin / common / concurrency.py
342caf9

Tree @342caf9 (Download .tar.gz)

concurrency.py @342caf9raw · history · blame

# Copyright 2025 Red Hat, Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.

import multiprocessing

from tempest import config

CONF = config.CONF


def run_concurrent_tasks(target, **kwargs):
    """Run a target function concurrently using multiprocessing."""
    manager = multiprocessing.Manager()
    resource_ids = manager.list()
    # To capture exceptions
    errors = manager.list()
    resource_count = CONF.volume.concurrent_resource_count

    def wrapped_target(index, resource_ids, **kwargs):
        try:
            target(index, resource_ids, **kwargs)
        except Exception as e:
            errors.append(f"Worker {index} failed: {str(e)}")

    processes = []
    for i in range(resource_count):
        p = multiprocessing.Process(
            target=wrapped_target,
            args=(i, resource_ids),
            kwargs=kwargs
        )
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    if errors:
        error_msg = "\n".join(errors)
        raise RuntimeError(
            f"One or more concurrent tasks failed:\n{error_msg}")

    return list(resource_ids)