mirror of
https://github.com/AllanWang/Frost-for-Facebook.git
synced 2024-11-10 04:52:38 +01:00
Convert global continuations to completable deferred
This commit is contained in:
parent
8c77e02e89
commit
339ce9db98
@ -44,22 +44,6 @@ val fbAuth = Flyweight<String, RequestAuth>(GlobalScope, 100, 3600000 /* an hour
|
||||
it.getAuth()
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronously fetch [RequestAuth] from cookie
|
||||
* [action] will only be called if a valid auth is found.
|
||||
* Otherwise, [fail] will be called
|
||||
*/
|
||||
fun String?.fbRequest(fail: () -> Unit = {}, action: RequestAuth.() -> Unit) {
|
||||
if (this == null) return fail()
|
||||
try {
|
||||
val auth = runBlocking { fbAuth.fetch(this@fbRequest) }
|
||||
auth.action()
|
||||
} catch (e: Exception) {
|
||||
L.e { "Failed auth for ${hashCode()}: ${e.message}" }
|
||||
fail()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Underlying container for all fb requests
|
||||
*/
|
||||
|
@ -133,7 +133,7 @@ class HdImageFetcher(private val model: HdImageMaybe) : DataFetcher<InputStream>
|
||||
val result: Result<InputStream?> = runCatching {
|
||||
runBlocking {
|
||||
withTimeout(20000L) {
|
||||
val auth = fbAuth.fetch(model.cookie)
|
||||
val auth = fbAuth.fetch(model.cookie).await()
|
||||
if (cancelled) throw RuntimeException("Cancelled")
|
||||
val url = auth.getFullSizedImage(model.id).invoke() ?: throw RuntimeException("Null url")
|
||||
if (cancelled) throw RuntimeException("Cancelled")
|
||||
|
@ -74,7 +74,7 @@ class MenuFragment : GenericRecyclerFragment<MenuItemData, IItem<*, *>>() {
|
||||
override suspend fun reloadImpl(progress: (Int) -> Unit): List<MenuItemData>? = withContext(Dispatchers.IO) {
|
||||
val cookie = FbCookie.webCookie ?: return@withContext null
|
||||
progress(10)
|
||||
val auth = fbAuth.fetch(cookie)
|
||||
val auth = fbAuth.fetch(cookie).await()
|
||||
progress(30)
|
||||
val data = auth.getMenuData().invoke() ?: return@withContext null
|
||||
if (data.data.isEmpty()) return@withContext null
|
||||
|
@ -17,6 +17,7 @@
|
||||
package com.pitchedapps.frost.kotlin
|
||||
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
@ -25,7 +26,6 @@ import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.selects.select
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import kotlin.coroutines.Continuation
|
||||
import kotlin.coroutines.resumeWithException
|
||||
import kotlin.coroutines.suspendCoroutine
|
||||
|
||||
@ -44,7 +44,7 @@ class Flyweight<K, V>(
|
||||
) {
|
||||
|
||||
// Receives a key and a pending request
|
||||
private val actionChannel = Channel<Pair<K, Continuation<V>>>(capacity)
|
||||
private val actionChannel = Channel<Pair<K, CompletableDeferred<V>>>(capacity)
|
||||
// Receives a key to invalidate the associated value
|
||||
private val invalidatorChannel = Channel<K>(capacity)
|
||||
// Receives a key to fetch the value
|
||||
@ -58,10 +58,17 @@ class Flyweight<K, V>(
|
||||
private val resultMap: MutableMap<K, Result<V>> = mutableMapOf()
|
||||
// Keeps track of unfulfilled actions
|
||||
// Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053
|
||||
private val pendingMap: MutableMap<K, MutableList<Continuation<V>>> = ConcurrentHashMap()
|
||||
private val pendingMap: MutableMap<K, MutableList<CompletableDeferred<V>>> = ConcurrentHashMap()
|
||||
|
||||
private val job: Job
|
||||
|
||||
private fun CompletableDeferred<V>.completeWith(result: Result<V>) {
|
||||
if (result.isSuccess)
|
||||
complete(result.getOrNull()!!)
|
||||
else
|
||||
completeExceptionally(result.exceptionOrNull()!!)
|
||||
}
|
||||
|
||||
init {
|
||||
job = scope.launch(Dispatchers.IO) {
|
||||
launch {
|
||||
@ -70,15 +77,15 @@ class Flyweight<K, V>(
|
||||
/*
|
||||
* New request received. Continuation should be fulfilled eventually
|
||||
*/
|
||||
actionChannel.onReceive { (key, continuation) ->
|
||||
actionChannel.onReceive { (key, completable) ->
|
||||
val lastUpdate = conditionMap[key]
|
||||
val lastResult = resultMap[key]
|
||||
// Valid value, retrieved within acceptable time
|
||||
if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) {
|
||||
continuation.resumeWith(lastResult)
|
||||
completable.completeWith(lastResult)
|
||||
} else {
|
||||
val valueRequestPending = key in pendingMap
|
||||
pendingMap.getOrPut(key) { mutableListOf() }.add(continuation)
|
||||
pendingMap.getOrPut(key) { mutableListOf() }.add(completable)
|
||||
if (!valueRequestPending)
|
||||
requesterChannel.send(key)
|
||||
}
|
||||
@ -106,7 +113,7 @@ class Flyweight<K, V>(
|
||||
conditionMap[key] = System.currentTimeMillis()
|
||||
resultMap[key] = result
|
||||
pendingMap.remove(key)?.forEach {
|
||||
it.resumeWith(result)
|
||||
it.completeWith(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -126,11 +133,15 @@ class Flyweight<K, V>(
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun fetch(key: K): V = suspendCoroutine {
|
||||
if (!job.isActive) it.resumeWithException(IllegalStateException("Flyweight is not active"))
|
||||
else scope.launch {
|
||||
actionChannel.send(key to it)
|
||||
}
|
||||
/**
|
||||
* Queues the request, and returns a completable once it is sent to a channel.
|
||||
* The fetcher will only be suspended if the channels are full
|
||||
*/
|
||||
suspend fun fetch(key: K): CompletableDeferred<V> {
|
||||
val completable = CompletableDeferred<V>(job)
|
||||
if (!job.isActive) completable.completeExceptionally(IllegalStateException("Flyweight is not active"))
|
||||
else actionChannel.send(key to completable)
|
||||
return completable
|
||||
}
|
||||
|
||||
suspend fun invalidate(key: K) {
|
||||
@ -141,7 +152,7 @@ class Flyweight<K, V>(
|
||||
job.cancel()
|
||||
if (pendingMap.isNotEmpty()) {
|
||||
val error = CancellationException("Flyweight cancelled")
|
||||
pendingMap.values.flatten().forEach { it.resumeWithException(error) }
|
||||
pendingMap.values.flatten().forEach { it.completeExceptionally(error) }
|
||||
pendingMap.clear()
|
||||
}
|
||||
actionChannel.close()
|
||||
|
@ -179,7 +179,7 @@ class FrostRequestService : BaseJobService() {
|
||||
}
|
||||
launch(Dispatchers.IO) {
|
||||
try {
|
||||
val auth = fbAuth.fetch(cookie)
|
||||
val auth = fbAuth.fetch(cookie).await()
|
||||
command.invoke(auth, bundle)
|
||||
L.d {
|
||||
"Finished frost service for ${command.name} in ${System.currentTimeMillis() - startTime} ms"
|
||||
|
@ -28,7 +28,7 @@ import android.webkit.WebResourceRequest
|
||||
import android.webkit.WebView
|
||||
import ca.allanwang.kau.utils.fadeIn
|
||||
import ca.allanwang.kau.utils.isVisible
|
||||
import ca.allanwang.kau.utils.withMainContext
|
||||
import ca.allanwang.kau.utils.launchMain
|
||||
import com.pitchedapps.frost.dbflow.CookieModel
|
||||
import com.pitchedapps.frost.facebook.FB_LOGIN_URL
|
||||
import com.pitchedapps.frost.facebook.FB_USER_MATCHER
|
||||
@ -40,10 +40,8 @@ import com.pitchedapps.frost.injectors.jsInject
|
||||
import com.pitchedapps.frost.utils.L
|
||||
import com.pitchedapps.frost.utils.Prefs
|
||||
import com.pitchedapps.frost.utils.isFacebookUrl
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.suspendCancellableCoroutine
|
||||
import kotlin.coroutines.resume
|
||||
|
||||
/**
|
||||
* Created by Allan Wang on 2017-05-29.
|
||||
@ -54,7 +52,7 @@ class LoginWebView @JvmOverloads constructor(
|
||||
defStyleAttr: Int = 0
|
||||
) : WebView(context, attrs, defStyleAttr) {
|
||||
|
||||
private lateinit var loginCallback: (CookieModel) -> Unit
|
||||
private val completable: CompletableDeferred<CookieModel> = CompletableDeferred()
|
||||
private lateinit var progressCallback: (Int) -> Unit
|
||||
|
||||
@SuppressLint("SetJavaScriptEnabled")
|
||||
@ -65,19 +63,15 @@ class LoginWebView @JvmOverloads constructor(
|
||||
webChromeClient = LoginChromeClient()
|
||||
}
|
||||
|
||||
suspend fun loadLogin(progressCallback: (Int) -> Unit): CookieModel = withMainContext {
|
||||
coroutineScope {
|
||||
suspendCancellableCoroutine<CookieModel> { cont ->
|
||||
this@LoginWebView.progressCallback = progressCallback
|
||||
this@LoginWebView.loginCallback = { cont.resume(it) }
|
||||
L.d { "Begin loading login" }
|
||||
launch {
|
||||
FbCookie.reset()
|
||||
setupWebview()
|
||||
loadUrl(FB_LOGIN_URL)
|
||||
}
|
||||
}
|
||||
suspend fun loadLogin(progressCallback: (Int) -> Unit): CompletableDeferred<CookieModel> = coroutineScope {
|
||||
this@LoginWebView.progressCallback = progressCallback
|
||||
L.d { "Begin loading login" }
|
||||
launchMain {
|
||||
FbCookie.reset()
|
||||
setupWebview()
|
||||
loadUrl(FB_LOGIN_URL)
|
||||
}
|
||||
completable
|
||||
}
|
||||
|
||||
private inner class LoginClient : BaseWebViewClient() {
|
||||
@ -86,7 +80,7 @@ class LoginWebView @JvmOverloads constructor(
|
||||
super.onPageFinished(view, url)
|
||||
val cookieModel = checkForLogin(url)
|
||||
if (cookieModel != null)
|
||||
loginCallback(cookieModel)
|
||||
completable.complete(cookieModel)
|
||||
if (!view.isVisible) view.fadeIn()
|
||||
}
|
||||
|
||||
|
@ -54,7 +54,7 @@ class FlyweightTest {
|
||||
|
||||
@Test
|
||||
fun basic() {
|
||||
assertEquals(2, runBlocking { flyweight.fetch(1) }, "Invalid result")
|
||||
assertEquals(2, runBlocking { flyweight.fetch(1).await() }, "Invalid result")
|
||||
assertEquals(1, callCount.get(), "1 call expected")
|
||||
}
|
||||
|
||||
@ -62,9 +62,7 @@ class FlyweightTest {
|
||||
fun multipleWithOneKey() {
|
||||
val results: List<Int> = runBlocking {
|
||||
(0..1000).map {
|
||||
flyweight.scope.async {
|
||||
flyweight.fetch(1)
|
||||
}
|
||||
flyweight.fetch(1)
|
||||
}.map { it.await() }
|
||||
}
|
||||
assertEquals(1, callCount.get(), "1 call expected")
|
||||
@ -75,12 +73,12 @@ class FlyweightTest {
|
||||
@Test
|
||||
fun consecutiveReuse() {
|
||||
runBlocking {
|
||||
flyweight.fetch(1)
|
||||
flyweight.fetch(1).await()
|
||||
assertEquals(1, callCount.get(), "1 call expected")
|
||||
flyweight.fetch(1)
|
||||
flyweight.fetch(1).await()
|
||||
assertEquals(1, callCount.get(), "Reuse expected")
|
||||
Thread.sleep(300)
|
||||
flyweight.fetch(1)
|
||||
flyweight.fetch(1).await()
|
||||
assertEquals(2, callCount.get(), "Refetch expected")
|
||||
}
|
||||
}
|
||||
@ -88,10 +86,10 @@ class FlyweightTest {
|
||||
@Test
|
||||
fun invalidate() {
|
||||
runBlocking {
|
||||
flyweight.fetch(1)
|
||||
flyweight.fetch(1).await()
|
||||
assertEquals(1, callCount.get(), "1 call expected")
|
||||
flyweight.invalidate(1)
|
||||
flyweight.fetch(1)
|
||||
flyweight.fetch(1).await()
|
||||
assertEquals(2, callCount.get(), "New call expected")
|
||||
}
|
||||
}
|
||||
@ -100,10 +98,10 @@ class FlyweightTest {
|
||||
fun destroy() {
|
||||
runBlocking {
|
||||
val longRunningResult = async { flyweight.fetch(LONG_RUNNING_KEY) }
|
||||
flyweight.fetch(1)
|
||||
flyweight.fetch(1).await()
|
||||
flyweight.cancel()
|
||||
try {
|
||||
flyweight.fetch(1)
|
||||
flyweight.fetch(1).await()
|
||||
fail("Flyweight should not be fulfilled after it is destroyed")
|
||||
} catch (e: Exception) {
|
||||
assertEquals("Flyweight is not active", e.message, "Incorrect error found on fetch after destruction")
|
||||
|
Loading…
Reference in New Issue
Block a user