DRAFT Streamlit async operations require careful h

June 16, 2025

                                                                           

Streamlit async operations require careful handling due to synchronous execution model

Streamlit’s architecture fundamentally relies on a synchronous, top-to-bottom execution model. It reruns scripts on each user interaction. This creates significant challenges for implementing long-running async tasks and background processes. While Streamlit does not officially support multithreading in application code as of 2025, the framework has made notable improvements. These include event loop reuse (v1.31+), async generator support in st.write_stream, and fragment-based partial updates that enable more sophisticated async patterns.

How to properly run background tasks without blocking the UI

The most effective approach for non-blocking background operations combines pre-created UI containers. It also uses proper thread context management. The pre-layout pattern has emerged as the community standard. It requires all Streamlit widgets to be created before initiating async operations:

import streamlit as st
import asyncio
from streamlit.runtime.scriptrunner import add_script_run_ctx
import threading

def main():
    # Create all UI elements first
    progress_container = st.empty()
    result_container = st.empty()
    status_text = st.empty()

    # Then run async operations
    def background_task():
        for i in range(100):
            with progress_container:
                st.progress(i + 1, text=f"Processing... {i+1}%")
            time.sleep(0.1)

        with result_container:
            st.success("Task completed!")

    if st.button("Start Background Task"):
        thread = threading.Thread(target=background_task)
        add_script_run_ctx(thread)  # Critical for thread context
        thread.start()

For production applications, the concurrent.futures ThreadPoolExecutor provides a higher-level interface. It handles thread management more safely:

from concurrent.futures import ThreadPoolExecutor, as_completed

def run_concurrent_tasks(data_list):
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(process_item, item) for item in data_list]

        # Non-blocking result collection
        for future in as_completed(futures):
            yield future.result()


# Usage with progress tracking
if st.button("Process Data"):
    with st.spinner("Processing..."):
        results = list(run_concurrent_tasks(data))
        st.success(f"Processed {len(results)} items")

Common issues with asyncio.run() in Streamlit and solutions

The most frequently encountered error is “RuntimeError: Event loop is already running”. This happens because Streamlit operates within Tornado’s event loop. Direct use of asyncio.run() fails in this environment. The community has developed robust solutions:

import asyncio
from concurrent.futures import ThreadPoolExecutor

def run_async_task(async_func, *args):
    """
    Safely run async functions in Streamlit environment
    """
    try:
        # Check if event loop is already running
        loop = asyncio.get_running_loop()
        # Use ThreadPoolExecutor to run in separate thread
        with ThreadPoolExecutor(1) as pool:
            future = pool.submit(lambda: asyncio.run(async_func(*args)))
            return future.result()
    except RuntimeError:
        # No event loop running, safe to use asyncio.run
        return asyncio.run(async_func(*args))


# Alternative: Event loop management in session state
if 'event_loop' not in st.session_state:
    try:
        st.session_state.event_loop = asyncio.get_event_loop()
    except RuntimeError:
        st.session_state.event_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(st.session_state.event_loop)

The st.write_stream feature introduced in 2024 provides native async generator support. It automatically converts them to synchronous generators internally:

async def async_data_generator():
    for i in range(10):
        yield f"Processing item {i}"
        await asyncio.sleep(0.1)


# Streamlit handles async-to-sync conversion automatically
st.write_stream(async_data_generator())

Thread-safe approaches for updating session state

Thread safety in Streamlit requires careful handling. This is due to the framework’s internal architecture. The ScriptRunContext attachment pattern enables threads to safely interact with Streamlit’s API:

import threading
from streamlit.runtime.scriptrunner import add_script_run_ctx, get_script_run_ctx

class ThreadSafeStateManager:
    def __init__(self):
        self._lock = threading.RLock()

    def update_state(self, key, value):
        with self._lock:
            st.session_state[key] = value

    def run_background_task(self, target_func):
        thread = threading.Thread(target=target_func)
        ctx = get_script_run_ctx()
        add_script_run_ctx(thread, ctx)  # Enable st.* calls in thread
        thread.start()
        return thread


# Usage pattern
state_manager = ThreadSafeStateManager()

def background_computation():
    result = expensive_calculation()
    state_manager.update_state('result', result)
    st.toast('Computation complete!')  # Works due to context attachment

if st.button("Start Background Task"):
    state_manager.run_background_task(background_computation)

For production applications, queue-based communication provides the safest pattern for thread-to-main communication:

import queue

if 'result_queue' not in st.session_state:
    st.session_state.result_queue = queue.Queue()

def background_worker(result_queue):
    # Perform work without touching Streamlit directly
    result = perform_computation()
    result_queue.put(result)


# Check for results without blocking
try:
    result = st.session_state.result_queue.get_nowait()
    st.write(f"Result: {result}")
except queue.Empty:
    st.info("Processing...")

Real-time progress tracking examples

Modern Streamlit applications can implement sophisticated progress tracking. They use multiple patterns. The fragment-based auto-refresh pattern (Streamlit 1.37+) enables partial page updates:

@st.fragment(run_every="1s")  # Auto-refresh every second
def progress_monitor():
    if st.session_state.get('task_running'):
        progress = calculate_progress()
        st.progress(progress, text=f"Processing: {progress}%")

        if progress >= 100:
            st.session_state.task_running = False
            st.success("Task completed!")


# Multiple progress bars for complex operations
def multi_stage_process():
    stages = ['Data Loading', 'Processing', 'Model Training', 'Validation']
    progress_bars = {stage: st.progress(0, text=stage) for stage in stages}

    for stage_idx, stage in enumerate(stages):
        for i in range(100):
            progress_bars[stage].progress(i + 1, text=f"{stage}: {i+1}%")
            time.sleep(0.01)

        # Clean up completed stage
        progress_bars[stage].empty()
        st.success(f"✓ {stage} complete")

The st.status container provides an elegant solution for grouped progress tracking:

with st.status("Processing pipeline...", expanded=True) as status:
    st.write("Initializing...")
    progress = st.progress(0)

    for phase, total_steps in [("Loading", 50), ("Processing", 30), ("Finalizing", 20)]:
        st.write(f"{phase}...")
        for i in range(total_steps):
            progress.progress((i + 1) / total_steps, text=f"{phase}: {i+1}/{total_steps}")
            time.sleep(0.02)
        progress.empty()

    status.update(label="Pipeline complete!", state="complete", expanded=False)

Proper use of st.rerun() and auto-refresh patterns

The evolution from st.experimental_rerun() to st.rerun() in v1.37.0 introduced scope control. This enables more efficient refresh patterns.Key principle: use st.rerun() sparingly and prefer callbacks or fragments:


# ❌ Anti-pattern: Infinite loop risk
while condition:
    st.write("Updating...")
    st.rerun()  # No exit condition


# ✅ Best practice: Controlled rerun with conditions
if st.button("Refresh Data"):
    update_data()
    st.rerun()


# ✅ Better: Fragment-scoped reruns for partial updates
@st.fragment
def data_display():
    if st.button("Refresh This Section"):
        st.rerun(scope="fragment")  # Only reruns this fragment

    display_current_data()


# ✅ Best: Auto-refresh with streamlit-autorefresh
from streamlit_autorefresh import st_autorefresh


# Refresh every 2 seconds, limit to 100 refreshes
count = st_autorefresh(interval=2000, limit=100, key="data_refresh")


# Controllable refresh pattern
is_live = st.toggle("Live Updates")
refresh_rate = st.slider("Refresh Rate (seconds)", 1, 10, 2)

@st.fragment(run_every=f"{refresh_rate}s" if is_live else None)
def live_metrics():
    st.metric("Current Time", datetime.now().strftime("%H:%M:%S"))
    st.metric("Active Users", get_active_users())

Known issues and pitfalls with threading

Several critical issues persist in Streamlit’s threading model. Developers must navigate these carefully. Signal handling limitations prevent proper thread shutdown:


# ❌ This fails in background threads
signal.signal(signal.SIGTERM, handler)  # ValueError: signal only works in main thread


# ✅ Alternative: Use daemon threads and cleanup patterns
thread = threading.Thread(target=worker, daemon=True)
thread.start()

```**Context manager race conditions** were addressed in 2023 through the adoption of `contextvars`. However, **double initialization** remains problematic:

```python

# Use singleton pattern to prevent double initialization
@st.cache_resource
def get_background_manager():
    return BackgroundTaskManager()  # Only created once

```**SQLite thread safety** requires connection-per-thread patterns:

```python

# ❌ Fails: SQLite objects can't cross thread boundaries
conn = sqlite3.connect("database.db")


# ✅ Solution: Thread-local connections
thread_local = threading.local()

def get_db_connection():
    if not hasattr(thread_local, "connection"):
        thread_local.connection = sqlite3.connect("database.db")
    return thread_local.connection

Production architecture recommendations

For production applications requiring extensive background processing, the community strongly recommends task queue architectures. These use Redis with RQ or Celery:


# tasks.py - Separate task definitions
from rq import Queue
import redis

redis_conn = redis.Redis(host='localhost', port=6379)
task_queue = Queue(connection=redis_conn)

def enqueue_long_task(data):
    job = task_queue.enqueue(process_data, data, timeout='30m')
    return job.id


# streamlit_app.py - UI layer
if st.button("Process Large Dataset"):
    job_id = enqueue_long_task(uploaded_data)
    st.session_state.job_id = job_id
    st.info(f"Task queued: {job_id}")

if 'job_id' in st.session_state:
    job = task_queue.fetch_job(st.session_state.job_id)
    if job.is_finished:
        st.success(f"Completed! Result: {job.result}")
    elif job.is_failed:
        st.error(f"Task failed: {job.exc_info}")

Performance optimization strategies

Efficient async operations in Streamlit require careful resource management. Key optimizations include:

  1. Fragment-based updates minimize rerun overhead by isolating refresh zones
  2. Appropriate refresh intervals - maintain ≥1 second for production stability
  3. Data retention limits - keep only necessary historical data (e.g., last 100 points)
  4. Strategic caching with TTL values between 30-300 seconds based on data volatility
  5. Progress bar cleanup - always call .empty() to prevent memory leaks

The framework’s trajectory from 2023-2025 shows incremental improvements toward better async support. This is particularly evident for LLM streaming applications. It maintains its core synchronous execution model. Developers should use official patterns where available. They should implement external task processing for complex background requirements. Full native async support remains limited by architectural constraints.

                                                                           
comments powered by Disqus

Apache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting