--- name: kotlin-coroutines-flows description: Kotlin Coroutines and Flow patterns for Android and KMP — structured concurrency, Flow operators, StateFlow, error handling, and testing. origin: ECC --- # Kotlin Coroutines & Flows Patterns for structured concurrency, Flow-based reactive streams, and coroutine testing in Android and Kotlin Multiplatform projects. ## When to Activate - Writing async code with Kotlin coroutines - Using Flow, StateFlow, or SharedFlow for reactive data - Handling concurrent operations (parallel loading, debounce, retry) - Testing coroutines and Flows - Managing coroutine scopes and cancellation ## Structured Concurrency ### Scope Hierarchy ``` Application └── viewModelScope (ViewModel) └── coroutineScope { } (structured child) ├── async { } (concurrent task) └── async { } (concurrent task) ``` Always use structured concurrency — never `GlobalScope`: ```kotlin // BAD GlobalScope.launch { fetchData() } // GOOD — scoped to ViewModel lifecycle viewModelScope.launch { fetchData() } // GOOD — scoped to composable lifecycle LaunchedEffect(key) { fetchData() } ``` ### Parallel Decomposition Use `coroutineScope` + `async` for parallel work: ```kotlin suspend fun loadDashboard(): Dashboard = coroutineScope { val items = async { itemRepository.getRecent() } val stats = async { statsRepository.getToday() } val profile = async { userRepository.getCurrent() } Dashboard( items = items.await(), stats = stats.await(), profile = profile.await() ) } ``` ### SupervisorScope Use `supervisorScope` when child failures should not cancel siblings: ```kotlin suspend fun syncAll() = supervisorScope { launch { syncItems() } // failure here won't cancel syncStats launch { syncStats() } launch { syncSettings() } } ``` ## Flow Patterns ### Cold Flow — One-Shot to Stream Conversion ```kotlin fun observeItems(): Flow> = flow { // Re-emits whenever the database changes itemDao.observeAll() .map { entities -> entities.map { it.toDomain() } } .collect { emit(it) } } ``` ### StateFlow for UI State ```kotlin class DashboardViewModel( observeProgress: ObserveUserProgressUseCase ) : ViewModel() { val progress: StateFlow = observeProgress() .stateIn( scope = viewModelScope, started = SharingStarted.WhileSubscribed(5_000), initialValue = UserProgress.EMPTY ) } ``` `WhileSubscribed(5_000)` keeps the upstream active for 5 seconds after the last subscriber leaves — survives configuration changes without restarting. ### Combining Multiple Flows ```kotlin val uiState: StateFlow = combine( itemRepository.observeItems(), settingsRepository.observeTheme(), userRepository.observeProfile() ) { items, theme, profile -> HomeState(items = items, theme = theme, profile = profile) }.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState()) ``` ### Flow Operators ```kotlin // Debounce search input searchQuery .debounce(300) .distinctUntilChanged() .flatMapLatest { query -> repository.search(query) } .catch { emit(emptyList()) } .collect { results -> _state.update { it.copy(results = results) } } // Retry with exponential backoff fun fetchWithRetry(): Flow = flow { emit(api.fetch()) } .retryWhen { cause, attempt -> if (cause is IOException && attempt < 3) { delay(1000L * (1 shl attempt.toInt())) true } else { false } } ``` ### SharedFlow for One-Time Events ```kotlin class ItemListViewModel : ViewModel() { private val _effects = MutableSharedFlow() val effects: SharedFlow = _effects.asSharedFlow() sealed interface Effect { data class ShowSnackbar(val message: String) : Effect data class NavigateTo(val route: String) : Effect } private fun deleteItem(id: String) { viewModelScope.launch { repository.delete(id) _effects.emit(Effect.ShowSnackbar("Item deleted")) } } } // Collect in Composable LaunchedEffect(Unit) { viewModel.effects.collect { effect -> when (effect) { is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message) is Effect.NavigateTo -> navController.navigate(effect.route) } } } ``` ## Dispatchers ```kotlin // CPU-intensive work withContext(Dispatchers.Default) { parseJson(largePayload) } // IO-bound work withContext(Dispatchers.IO) { database.query() } // Main thread (UI) — default in viewModelScope withContext(Dispatchers.Main) { updateUi() } ``` In KMP, use `Dispatchers.Default` and `Dispatchers.Main` (available on all platforms). `Dispatchers.IO` is JVM/Android only — use `Dispatchers.Default` on other platforms or provide via DI. ## Cancellation ### Cooperative Cancellation Long-running loops must check for cancellation: ```kotlin suspend fun processItems(items: List) = coroutineScope { for (item in items) { ensureActive() // throws CancellationException if cancelled process(item) } } ``` ### Cleanup with try/finally ```kotlin viewModelScope.launch { try { _state.update { it.copy(isLoading = true) } val data = repository.fetch() _state.update { it.copy(data = data) } } finally { _state.update { it.copy(isLoading = false) } // always runs, even on cancellation } } ``` ## Testing ### Testing StateFlow with Turbine ```kotlin @Test fun `search updates item list`() = runTest { val fakeRepository = FakeItemRepository(items = testItems) val viewModel = ItemListViewModel(GetItemsUseCase(fakeRepository)) viewModel.state.test { assertEquals(ItemListState(), awaitItem()) // initial viewModel.onSearch("query") val loading = awaitItem() assertTrue(loading.isLoading) val loaded = awaitItem() assertFalse(loaded.isLoading) assertEquals(1, loaded.items.size) } } ``` ### Testing with TestDispatcher ```kotlin @Test fun `parallel load completes correctly`() = runTest { val viewModel = DashboardViewModel( itemRepo = FakeItemRepo(), statsRepo = FakeStatsRepo() ) viewModel.load() advanceUntilIdle() val state = viewModel.state.value assertNotNull(state.items) assertNotNull(state.stats) } ``` ### Faking Flows ```kotlin class FakeItemRepository : ItemRepository { private val _items = MutableStateFlow>(emptyList()) override fun observeItems(): Flow> = _items fun emit(items: List) { _items.value = items } override suspend fun getItemsByCategory(category: String): Result> { return Result.success(_items.value.filter { it.category == category }) } } ``` ## Anti-Patterns to Avoid - Using `GlobalScope` — leaks coroutines, no structured cancellation - Collecting Flows in `init {}` without a scope — use `viewModelScope.launch` - Using `MutableStateFlow` with mutable collections — always use immutable copies: `_state.update { it.copy(list = it.list + newItem) }` - Catching `CancellationException` — let it propagate for proper cancellation - Using `flowOn(Dispatchers.Main)` to collect — collection dispatcher is the caller's dispatcher - Creating `Flow` in `@Composable` without `remember` — recreates the flow every recomposition ## References See skill: `compose-multiplatform-patterns` for UI consumption of Flows. See skill: `android-clean-architecture` for where coroutines fit in layers.