Give AlbumentationsX a star on GitHub β€” it powers this leaderboard

Star on GitHub

multitasking

Non-blocking Python methods using decorators

Rank: #1603Downloads: 5,840,675 (30 days)Stars: 230Forks: 46

Description

MultiTasking: Non-blocking Python methods using decorators
==========================================================

|Python version| |PyPi version| |PyPi status| |PyPi downloads|
|CodeFactor| |Star this repo| |Follow me on twitter|

--------------

**MultiTasking** is a lightweight Python library that lets you convert
your Python methods into asynchronous, non-blocking methods simply by
using a decorator. Perfect for I/O-bound tasks, API calls, web scraping,
and any scenario where you want to run multiple operations concurrently
without the complexity of manual thread or process management.

✨ **What's New in v0.0.12**
----------------------------

-  🎯 **Full Type Hint Support**: Complete type annotations for better
   IDE support and code safety
-  πŸ“š **Enhanced Documentation**: Comprehensive docstrings and inline
   comments for better maintainability
-  πŸ”§ **Improved Error Handling**: More robust exception handling with
   specific error types
-  πŸš€ **Better Performance**: Optimized task creation and management
   logic
-  πŸ›‘οΈ **Code Quality**: PEP8 compliant, linter-friendly codebase

Quick Start
-----------

.. code:: python

   import multitasking
   import time

   @multitasking.task
   def fetch_data(url_id):
       # Simulate API call or I/O operation
       time.sleep(1)
       return f"Data from {url_id}"

   # These run concurrently, not sequentially!
   for i in range(5):
       fetch_data(i)

   # Wait for all tasks to complete
   multitasking.wait_for_tasks()
   print("All data fetched!")

Basic Example
-------------

.. code:: python

   # example.py
   import multitasking
   import time
   import random
   import signal

   # Kill all tasks on ctrl-c (recommended for development)
   signal.signal(signal.SIGINT, multitasking.killall)

   # Or, wait for tasks to finish gracefully on ctrl-c:
   # signal.signal(signal.SIGINT, multitasking.wait_for_tasks)

   @multitasking.task  # <== this is all it takes! πŸŽ‰
   def hello(count):
       sleep_time = random.randint(1, 10) / 2
       print(f"Hello {count} (sleeping for {sleep_time}s)")
       time.sleep(sleep_time)
       print(f"Goodbye {count} (slept for {sleep_time}s)")

   if __name__ == "__main__":
       # Launch 10 concurrent tasks
       for i in range(10):
           hello(i + 1)

       # Wait for all tasks to complete
       multitasking.wait_for_tasks()
       print("All tasks completed!")

**Output:**

.. code:: bash

   $ python example.py

   Hello 1 (sleeping for 0.5s)
   Hello 2 (sleeping for 1.0s)
   Hello 3 (sleeping for 5.0s)
   Hello 4 (sleeping for 0.5s)
   Hello 5 (sleeping for 2.5s)
   Hello 6 (sleeping for 3.0s)
   Hello 7 (sleeping for 0.5s)
   Hello 8 (sleeping for 4.0s)
   Hello 9 (sleeping for 3.0s)
   Hello 10 (sleeping for 1.0s)
   Goodbye 1 (slept for 0.5s)
   Goodbye 4 (slept for 0.5s)
   Goodbye 7 (slept for 0.5s)
   Goodbye 2 (slept for 1.0s)
   Goodbye 10 (slept for 1.0s)
   Goodbye 5 (slept for 2.5s)
   Goodbye 6 (slept for 3.0s)
   Goodbye 9 (slept for 3.0s)
   Goodbye 8 (slept for 4.0s)
   Goodbye 3 (slept for 5.0s)
   All tasks completed!

Advanced Usage
==============

Real-World Examples
-------------------

**Web Scraping with Concurrent Requests:**

.. code:: python

   import multitasking
   import requests
   import signal

   signal.signal(signal.SIGINT, multitasking.killall)

   @multitasking.task
   def fetch_url(url):
       try:
           response = requests.get(url, timeout=10)
           print(f"βœ… {url}: {response.status_code}")
           return response.text
       except Exception as e:
           print(f"❌ {url}: {str(e)}")
           return None

   # Fetch multiple URLs concurrently
   urls = [
       "https://httpbin.org/delay/1",
       "https://httpbin.org/delay/2",
       "https://httpbin.org/status/200",
       "https://httpbin.org/json"
   ]

   for url in urls:
       fetch_url(url)

   multitasking.wait_for_tasks()
   print(f"Processed {len(urls)} URLs concurrently!")

**Database Operations:**

.. code:: python

   import multitasking
   import sqlite3
   import time

   @multitasking.task
   def process_batch(batch_id, data_batch):
       # Simulate database processing
       conn = sqlite3.connect(f'batch_{batch_id}.db')
       # ... database operations ...
       conn.close()
       print(f"Processed batch {batch_id} with {len(data_batch)} records")

   # Process multiple data batches concurrently
   large_dataset = list(range(1000))
   batch_size = 100

   for i in range(0, len(large_dataset), batch_size):
       batch = large_dataset[i:i + batch_size]
       process_batch(i // batch_size, batch)

   multitasking.wait_for_tasks()

Pool Management
---------------

MultiTasking uses execution pools to manage concurrent tasks. You can
create and configure multiple pools for different types of operations:

.. code:: python

   import multitasking

   # Create a pool for API calls (higher concurrency)
   multitasking.createPool("api_pool", threads=20, engine="thread")

   # Create a pool for CPU-intensive tasks (lower concurrency)
   multitasking.createPool("cpu_pool", threads=4, engine="process")

   # Switch between pools
   multitasking.use_tag("api_pool")  # Future tasks use this pool

   @multitasking.task
   def api_call(endpoint):
       # This will use the api_pool
       pass

   # Get pool information
   pool_info = multitasking.getPool("api_pool")
   print(f"Pool: {pool_info}")  # {'engine': 'thread', 'name': 'api_pool', 'threads': 20}

Task Monitoring
---------------

Monitor and control your tasks with built-in functions:

.. code:: python

   import multitasking
   import time

   @multitasking.task
   def long_running_task(task_id):
       time.sleep(2)
       print(f"Task {task_id} completed")

   # Start some tasks
   for i in range(5):
       long_running_task(i)

   # Monitor active tasks
   while multitasking.get_active_tasks():
       active_count = len(multitasking.get_active_tasks())
       total_count = len(multitasking.get_list_of_tasks())
       print(f"Progress: {total_count - active_count}/{total_count} completed")
       time.sleep(0.5)

   print("All tasks finished!")

Configuration & Settings
========================

Thread/Process Limits
---------------------

The default maximum threads equals the number of CPU cores. You can
customize this:

.. code:: python

   import multitasking

   # Set maximum concurrent tasks
   multitasking.set_max_threads(10)

   # Scale based on CPU cores (good rule of thumb for I/O-bound tasks)
   multitasking.set_max_threads(multitasking.config["CPU_CORES"] * 5)

   # Unlimited concurrent tasks (use carefully!)
   multitasking.set_max_threads(0)

Execution Engine Selection
--------------------------

Choose between threading and multiprocessing based on your use case:

.. code:: python

   import multitasking

   # For I/O-bound tasks (default, recommended for most cases)
   multitasking.set_engine("thread")

   # For CPU-bound tasks (avoids GIL limitations)
   multitasking.set_engine("process")

**When to use threads vs processes:**

-  **Threads** (default): Best for I/O-bound tasks like file operations,
   network requests, database queries
-  **Processes**: Best for CPU-intensive tasks like mathematical
   computations, image processing, data analysis

Advanced Pool Configuration
---------------------------

Create specialized pools for different workloads:

.. code:: python

   import multitasking

   # Fast pool for quick API calls
   multitasking.createPool("fast_api", threads=50, engine="thread")

   # CPU pool for heavy computation
   multitasking.createPool("compute", threads=2, engine="process")

   # Unlimited pool for lightweight tasks
   multitasking.createPool("unlimited", threads=0, engine="thread")

   # Get current pool info
   current_pool = multitasking.getPool()
   print(f"Using pool: {current_pool['name']}")

Best Practices
==============

Performance Tips
----------------

1. **Choose the right engine**: Use threads for I/O-bound tasks,
   processes for CPU-bound tasks
2. **Tune thread counts**: Start with CPU cores Γ— 2-5 for I/O tasks, CPU
   cores for CPU tasks
3. **Use pools wisely**: Create separate pools for different types of
   operations
4. **Monitor memory usage**: Each thread/process consumes memory
5. **Handle exceptions**: Always wrap risky operations in try-catch
   blocks

Error Handling
--------------

.. code:: python

   import multitasking
   import requests

   @multitasking.task
   def robust_fetch(url):
       try:
           response = requests.get(url, timeout=10)
           response.raise_for_status()
           return response.json()
       except requests.exceptions.Timeout:
           print(f"⏰ Timeout fetching {url}")
       except requests.exceptions.RequestException as e:
           print(f"❌ Error fetching {url}: {e}")
       except Exception as e:
           print(f"πŸ’₯ Unexpected error: {e}")
       return None

Resource Management
-------------------

.. code:: python

   import multitasking
   import signal

   # Graceful shutdown on interrupt
   def cleanup_handler(signum, frame):
       print("πŸ›‘ Shutting down gracefully...")
       multitasking.wait_for_tasks()
       print("βœ… All tasks completed")
       exit(0)

   signal.signal(signal.SIGINT, cleanup_handler)

   # Your application code here...

Troubleshooting
===============

Common Issues
-------------

**Tasks not running concurrently?** Check if you’re calling
``wait_for_tasks()`` inside your task loop instead of after it.

**High memory usage?** Reduce the number of concurrent threads or switch
to a process-based engine.

**Tasks hanging?** Ensure your tasks can complete (avoid infinite loops)
and handle exceptions properly.

**Import errors?** Make sure you’re using Python 3.6+ and have installed
the latest version.

Debugging
---------

.. code:: python

   import multitasking

   # Enable task monitoring
   active_tasks = multitasking.get_active_tasks()
   all_tasks = multitasking.get_list_of_ta