Advanced Termination Techniques
Decorators for Function Termination
Timeout Decorator
import signal
import functools
def timeout(seconds):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
def handler(signum, frame):
raise TimeoutError(f"Function call timed out after {seconds} seconds")
signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wrapper
return decorator
@timeout(2)
def long_running_function():
## Function that might take too long
pass
Contextual Termination Strategies
Context Managers
class TerminationManager:
def __init__(self, max_iterations=100):
self.max_iterations = max_iterations
self.current_iteration = 0
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
## Cleanup or logging logic
pass
def check_termination(self):
self.current_iteration += 1
if self.current_iteration > self.max_iterations:
raise StopIteration("Maximum iterations reached")
def complex_computation():
with TerminationManager() as manager:
while True:
manager.check_termination()
## Computation logic
Termination Flow Visualization
graph TD
A[Start Execution] --> B{Termination Conditions}
B -->|Soft Termination| C[Graceful Exit]
B -->|Hard Termination| D[Immediate Stop]
C --> E[Resource Cleanup]
D --> F[Force Shutdown]
Advanced Exception Handling
Custom Termination Exceptions
class TerminationException(Exception):
def __init__(self, message, error_code=None):
self.message = message
self.error_code = error_code
super().__init__(self.message)
def critical_process():
try:
## Complex processing
if critical_condition:
raise TerminationException("Critical error detected", error_code=500)
except TerminationException as e:
print(f"Termination: {e.message}")
## Custom error handling
Termination Method Comparison
Technique |
Complexity |
Use Case |
Performance Impact |
Decorators |
High |
Complex control flow |
Moderate overhead |
Context Managers |
Medium |
Resource management |
Low overhead |
Custom Exceptions |
Low |
Error handling |
Minimal overhead |
Asynchronous Termination
Concurrent Function Stopping
import asyncio
async def interruptible_task():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
## Cleanup logic when task is cancelled
pass
async def main():
task = asyncio.create_task(interruptible_task())
await asyncio.sleep(5)
task.cancel() ## Terminate task
Best Practices
- Design flexible termination mechanisms
- Minimize resource leaks
- Provide clear error reporting
- Use appropriate termination strategy
LabEx encourages developers to master these advanced techniques for robust Python programming.