Skip to content

Commit cc271b9

Browse files
fix: Change to use plain StateFlow
* avoids fetching twice when changing context synchronously (due to explicit fetch plus context change propagating) * Add volatile to variable --------- Co-authored-by: Gastón Fournier <[email protected]>
1 parent a1093de commit cc271b9

File tree

4 files changed

+163
-77
lines changed

4 files changed

+163
-77
lines changed

unleashandroidsdk/src/main/java/io/getunleash/android/DefaultUnleash.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,22 +257,26 @@ class DefaultUnleash(
257257
}
258258

259259
override fun setContext(context: UnleashContext) {
260-
unleashContextState.value = context
261260
if (started.get()) {
262-
refreshTogglesNow()
261+
runBlocking {
262+
withContext(Dispatchers.IO) {
263+
fetcher.refreshTogglesWithContext(context)
264+
}
265+
}
263266
}
267+
unleashContextState.value = context
264268
}
265269

266270
@Throws(TimeoutException::class)
267271
override fun setContextWithTimeout(context: UnleashContext, timeout: Long) {
268-
unleashContextState.value = context
269272
if (started.get()) {
270273
runBlocking {
271274
withTimeout(timeout) {
272-
fetcher.refreshToggles()
275+
fetcher.refreshTogglesWithContext(context)
273276
}
274277
}
275278
}
279+
unleashContextState.value = context
276280
}
277281

278282
override fun setContextAsync(context: UnleashContext) {

unleashandroidsdk/src/main/java/io/getunleash/android/polling/UnleashFetcher.kt

Lines changed: 87 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ import io.getunleash.android.errors.ServerException
1111
import io.getunleash.android.events.HeartbeatEvent
1212
import io.getunleash.android.http.Throttler
1313
import io.getunleash.android.unleashScope
14+
import java.io.Closeable
15+
import java.io.IOException
16+
import java.util.concurrent.TimeUnit
17+
import java.util.concurrent.atomic.AtomicReference
18+
import kotlin.coroutines.CoroutineContext
19+
import kotlin.coroutines.resume
20+
import kotlin.coroutines.resumeWithException
1421
import kotlinx.coroutines.Dispatchers
1522
import kotlinx.coroutines.channels.BufferOverflow
1623
import kotlinx.coroutines.flow.MutableSharedFlow
@@ -30,54 +37,55 @@ import okhttp3.OkHttpClient
3037
import okhttp3.Request
3138
import okhttp3.Response
3239
import okhttp3.internal.closeQuietly
33-
import java.io.Closeable
34-
import java.io.IOException
35-
import java.util.concurrent.TimeUnit
36-
import java.util.concurrent.atomic.AtomicReference
37-
import kotlin.coroutines.CoroutineContext
38-
import kotlin.coroutines.resume
39-
import kotlin.coroutines.resumeWithException
4040

4141
/**
42-
* Http Client for fetching data from Unleash Proxy.
43-
* By default creates an OkHttpClient with readTimeout set to 2 seconds and a cache of 10 MBs
44-
* @param httpClient - the http client to use for fetching toggles from Unleash proxy
42+
* Http Client for fetching data from Unleash Proxy. By default creates an OkHttpClient with
43+
* readTimeout set to 2 seconds and a cache of 10 MBs
44+
* @param httpClient
45+
* - the http client to use for fetching toggles from Unleash proxy
4546
*/
4647
open class UnleashFetcher(
47-
unleashConfig: UnleashConfig,
48-
private val httpClient: OkHttpClient,
49-
private val unleashContext: StateFlow<UnleashContext>,
48+
unleashConfig: UnleashConfig,
49+
private val httpClient: OkHttpClient,
50+
private val unleashContext: StateFlow<UnleashContext>,
5051
) : Closeable {
5152
companion object {
5253
private const val TAG = "UnleashFetcher"
5354
}
54-
55+
@Volatile private var contextForLastFetch: UnleashContext? = null
5556
private val proxyUrl = unleashConfig.proxyUrl?.toHttpUrl()
56-
private val applicationHeaders = unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy)
57+
private val applicationHeaders =
58+
unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy)
5759
private val appName = unleashConfig.appName
5860
private var etag: String? = null
59-
private val featuresReceivedFlow = MutableSharedFlow<UnleashState>(
60-
replay = 1,
61-
onBufferOverflow = BufferOverflow.DROP_OLDEST
62-
)
63-
private val fetcherHeartbeatFlow = MutableSharedFlow<HeartbeatEvent>(
64-
extraBufferCapacity = 5,
65-
onBufferOverflow = BufferOverflow.DROP_OLDEST
66-
)
61+
private val featuresReceivedFlow =
62+
MutableSharedFlow<UnleashState>(
63+
replay = 1,
64+
onBufferOverflow = BufferOverflow.DROP_OLDEST
65+
)
66+
private val fetcherHeartbeatFlow =
67+
MutableSharedFlow<HeartbeatEvent>(
68+
extraBufferCapacity = 5,
69+
onBufferOverflow = BufferOverflow.DROP_OLDEST
70+
)
6771
private val coroutineContextForContextChange: CoroutineContext = Dispatchers.IO
6872
private val currentCall = AtomicReference<Call?>(null)
6973
private val throttler =
70-
Throttler(
71-
TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval),
72-
longestAcceptableIntervalSeconds = 300,
73-
proxyUrl.toString()
74-
)
74+
Throttler(
75+
TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval),
76+
longestAcceptableIntervalSeconds = 300,
77+
proxyUrl.toString()
78+
)
7579

7680
fun getFeaturesReceivedFlow() = featuresReceivedFlow.asSharedFlow()
7781

7882
fun startWatchingContext() {
7983
unleashScope.launch {
80-
unleashContext.distinctUntilChanged { old, new -> old != new }.collect {
84+
unleashContext.collect {
85+
if (it == contextForLastFetch) {
86+
Log.d(TAG, "Context unchanged, skipping refresh toggles")
87+
return@collect
88+
}
8189
withContext(coroutineContextForContextChange) {
8290
Log.d(TAG, "Unleash context changed: $it")
8391
refreshToggles()
@@ -89,7 +97,7 @@ open class UnleashFetcher(
8997
suspend fun refreshToggles(): ToggleResponse {
9098
if (throttler.performAction()) {
9199
Log.d(TAG, "Refreshing toggles")
92-
val response = refreshTogglesWithContext(unleashContext.value)
100+
val response = doFetchToggles(unleashContext.value)
93101
fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message))
94102
return response
95103
}
@@ -98,15 +106,28 @@ open class UnleashFetcher(
98106
return ToggleResponse(Status.THROTTLED)
99107
}
100108

101-
internal suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse {
109+
suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse {
110+
if (throttler.performAction()) {
111+
Log.d(TAG, "Refreshing toggles")
112+
val response = doFetchToggles(ctx)
113+
fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message))
114+
return response
115+
}
116+
Log.i(TAG, "Skipping refresh toggles due to throttling")
117+
fetcherHeartbeatFlow.emit(HeartbeatEvent(Status.THROTTLED))
118+
return ToggleResponse(Status.THROTTLED)
119+
}
120+
121+
internal suspend fun doFetchToggles(ctx: UnleashContext): ToggleResponse {
122+
contextForLastFetch = ctx
102123
val response = fetchToggles(ctx)
103124
if (response.isSuccess()) {
104125

105-
val toggles = response.config!!.toggles.groupBy { it.name }
106-
.mapValues { (_, v) -> v.first() }
126+
val toggles =
127+
response.config!!.toggles.groupBy { it.name }.mapValues { (_, v) -> v.first() }
107128
Log.d(
108-
TAG,
109-
"Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow"
129+
TAG,
130+
"Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow"
110131
)
111132
featuresReceivedFlow.emit(UnleashState(ctx, toggles))
112133
return ToggleResponse(response.status, toggles)
@@ -124,26 +145,31 @@ open class UnleashFetcher(
124145

125146
private suspend fun fetchToggles(ctx: UnleashContext): FetchResponse {
126147
if (proxyUrl == null) {
127-
return FetchResponse(Status.FAILED, error = IllegalStateException("Proxy URL is not set"))
148+
return FetchResponse(
149+
Status.FAILED,
150+
error = IllegalStateException("Proxy URL is not set")
151+
)
128152
}
129153
val contextUrl = buildContextUrl(ctx)
130154
try {
131-
val request = Request.Builder().url(contextUrl)
132-
.headers(applicationHeaders.toHeaders())
155+
val request = Request.Builder().url(contextUrl).headers(applicationHeaders.toHeaders())
133156
if (etag != null) {
134157
request.header("If-None-Match", etag!!)
135158
}
136159
val call = this.httpClient.newCall(request.build())
137160
val inFlightCall = currentCall.get()
138161
if (!currentCall.compareAndSet(inFlightCall, call)) {
139162
return FetchResponse(
140-
Status.FAILED,
141-
error = IllegalStateException("Failed to set new call while ${inFlightCall?.request()?.url} is in flight")
163+
Status.FAILED,
164+
error =
165+
IllegalStateException(
166+
"Failed to set new call while ${inFlightCall?.request()?.url} is in flight"
167+
)
142168
)
143-
} else if (inFlightCall != null && !inFlightCall.isCanceled()) {
169+
} else if (inFlightCall != null && !inFlightCall.isCanceled() && !inFlightCall.isExecuted()) {
144170
Log.d(
145-
TAG,
146-
"Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}"
171+
TAG,
172+
"Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}"
147173
)
148174
inFlightCall.cancel()
149175
}
@@ -159,23 +185,21 @@ open class UnleashFetcher(
159185
res.body?.use { b ->
160186
try {
161187
val proxyResponse: ProxyResponse =
162-
proxyResponseAdapter.fromJson(b.string())!!
188+
proxyResponseAdapter.fromJson(b.string())!!
163189
FetchResponse(Status.SUCCESS, proxyResponse)
164190
} catch (e: Exception) {
165191
// If we fail to parse, just keep data
166192
FetchResponse(Status.FAILED, error = e)
167193
}
168-
} ?: FetchResponse(Status.FAILED, error = NoBodyException())
194+
}
195+
?: FetchResponse(Status.FAILED, error = NoBodyException())
169196
}
170-
171197
res.code == 304 -> {
172198
FetchResponse(Status.NOT_MODIFIED)
173199
}
174-
175200
res.code == 401 -> {
176201
FetchResponse(Status.FAILED, error = NotAuthorizedException())
177202
}
178-
179203
else -> {
180204
FetchResponse(Status.FAILED, error = ServerException(res.code))
181205
}
@@ -188,31 +212,33 @@ open class UnleashFetcher(
188212

189213
private suspend fun Call.await(): Response {
190214
return suspendCancellableCoroutine { continuation ->
191-
enqueue(object : Callback {
192-
override fun onResponse(call: Call, response: Response) {
193-
continuation.resume(response)
194-
}
215+
enqueue(
216+
object : Callback {
217+
override fun onResponse(call: Call, response: Response) {
218+
continuation.resume(response)
219+
}
195220

196-
override fun onFailure(call: Call, e: IOException) {
197-
// Don't bother with resuming the continuation if it is already cancelled.
198-
if (continuation.isCancelled) return
199-
continuation.resumeWithException(e)
200-
}
201-
})
221+
override fun onFailure(call: Call, e: IOException) {
222+
// Don't bother with resuming the continuation if it is already
223+
// cancelled.
224+
if (continuation.isCancelled) return
225+
continuation.resumeWithException(e)
226+
}
227+
}
228+
)
202229

203230
continuation.invokeOnCancellation {
204231
try {
205232
cancel()
206233
} catch (ex: Throwable) {
207-
//Ignore cancel exception
234+
// Ignore cancel exception
208235
}
209236
}
210237
}
211238
}
212239

213240
private fun buildContextUrl(ctx: UnleashContext): HttpUrl {
214-
var contextUrl = proxyUrl!!.newBuilder()
215-
.addQueryParameter("appName", appName)
241+
var contextUrl = proxyUrl!!.newBuilder().addQueryParameter("appName", appName)
216242
if (ctx.userId != null) {
217243
contextUrl.addQueryParameter("userId", ctx.userId)
218244
}

0 commit comments

Comments
 (0)