From 790331e7984541dd7d78c6dbec1c26fe5437f40e Mon Sep 17 00:00:00 2001 From: Kelvin <kelvin@futo.org> Date: Tue, 1 Oct 2024 14:30:33 +0200 Subject: [PATCH] History sync support --- .../platformplayer/api/media/Serializer.kt | 2 +- .../models/video/SerializedPlatformVideo.kt | 4 +- .../mainactivity/main/VideoDetailView.kt | 4 +- .../platformplayer/states/StateHistory.kt | 43 +++++++++++++- .../futo/platformplayer/states/StateSync.kt | 23 +++++++- .../stores/StringTMapStorage.kt | 29 ++++++++++ .../futo/platformplayer/sync/GJSyncOpcodes.kt | 4 ++ .../platformplayer/sync/SyncSessionData.kt | 13 +++++ .../sync/internal/SyncSession.kt | 58 +++++++++++++++++++ 9 files changed, 171 insertions(+), 9 deletions(-) create mode 100644 app/src/main/java/com/futo/platformplayer/stores/StringTMapStorage.kt create mode 100644 app/src/main/java/com/futo/platformplayer/sync/SyncSessionData.kt diff --git a/app/src/main/java/com/futo/platformplayer/api/media/Serializer.kt b/app/src/main/java/com/futo/platformplayer/api/media/Serializer.kt index 40c3edfd..a4501545 100644 --- a/app/src/main/java/com/futo/platformplayer/api/media/Serializer.kt +++ b/app/src/main/java/com/futo/platformplayer/api/media/Serializer.kt @@ -4,6 +4,6 @@ import kotlinx.serialization.json.Json class Serializer { companion object { - val json = Json { ignoreUnknownKeys = true; encodeDefaults = true; }; + val json = Json { ignoreUnknownKeys = true; encodeDefaults = true; coerceInputValues = true }; } } \ No newline at end of file diff --git a/app/src/main/java/com/futo/platformplayer/api/media/models/video/SerializedPlatformVideo.kt b/app/src/main/java/com/futo/platformplayer/api/media/models/video/SerializedPlatformVideo.kt index ee49ebca..5096ce54 100644 --- a/app/src/main/java/com/futo/platformplayer/api/media/models/video/SerializedPlatformVideo.kt +++ b/app/src/main/java/com/futo/platformplayer/api/media/models/video/SerializedPlatformVideo.kt @@ -19,9 +19,9 @@ open class SerializedPlatformVideo( override val thumbnails: Thumbnails, override val author: PlatformAuthorLink, @kotlinx.serialization.Serializable(with = OffsetDateTimeNullableSerializer::class) - override val datetime: OffsetDateTime?, + override val datetime: OffsetDateTime? = null, override val url: String, - override val shareUrl: String, + override val shareUrl: String = "", override val duration: Long, override val viewCount: Long, diff --git a/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/VideoDetailView.kt b/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/VideoDetailView.kt index 0789b904..30d0ab30 100644 --- a/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/VideoDetailView.kt +++ b/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/VideoDetailView.kt @@ -1477,7 +1477,7 @@ class VideoDetailView : ConstraintLayout { val historyItem = getHistoryIndex(videoDetail) ?: return@launch; withContext(Dispatchers.Main) { - _historicalPosition = StateHistory.instance.updateHistoryPosition(video, historyItem,false, (toResume.toFloat() / 1000.0f).toLong()); + _historicalPosition = StateHistory.instance.updateHistoryPosition(video, historyItem,false, (toResume.toFloat() / 1000.0f).toLong(), null, true); Logger.i(TAG, "Historical position: $_historicalPosition, last position: $lastPositionMilliseconds"); if (_historicalPosition > 60 && video.duration - _historicalPosition > 5 && Math.abs(_historicalPosition - lastPositionMilliseconds / 1000) > 5.0) { _layoutResume.visibility = View.VISIBLE; @@ -2497,7 +2497,7 @@ class VideoDetailView : ConstraintLayout { if (v !is TutorialFragment.TutorialVideo) { fragment.lifecycleScope.launch(Dispatchers.IO) { val history = getHistoryIndex(v) ?: return@launch; - StateHistory.instance.updateHistoryPosition(v, history, true, (positionMilliseconds.toFloat() / 1000.0f).toLong()); + StateHistory.instance.updateHistoryPosition(v, history, true, (positionMilliseconds.toFloat() / 1000.0f).toLong(), null, true); } } _lastPositionSaveTime = currentTime; diff --git a/app/src/main/java/com/futo/platformplayer/states/StateHistory.kt b/app/src/main/java/com/futo/platformplayer/states/StateHistory.kt index bd87b9b0..d5da7880 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateHistory.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateHistory.kt @@ -12,9 +12,13 @@ import com.futo.platformplayer.stores.FragmentedStorage import com.futo.platformplayer.stores.db.ManagedDBStore import com.futo.platformplayer.stores.db.types.DBHistory import com.futo.platformplayer.stores.v2.ReconstructStore +import com.futo.platformplayer.sync.internal.GJSyncOpcodes +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import java.time.OffsetDateTime import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap +import kotlin.math.min class StateHistory { //Legacy @@ -56,7 +60,7 @@ class StateHistory { } - fun updateHistoryPosition(liveObj: IPlatformVideo, index: DBHistory.Index, updateExisting: Boolean, position: Long = -1L): Long { + fun updateHistoryPosition(liveObj: IPlatformVideo, index: DBHistory.Index, updateExisting: Boolean, position: Long = -1L, date: OffsetDateTime? = null, isUserAction: Boolean = false): Long { val pos = if(position < 0) 0 else position; val historyVideo = index.obj; @@ -76,16 +80,49 @@ class StateHistory { historyVideo.video = SerializedPlatformVideo.fromVideo(liveObj); historyVideo.position = pos; - historyVideo.date = OffsetDateTime.now(); + historyVideo.date = date ?: OffsetDateTime.now(); _historyDBStore.update(index.id!!, historyVideo); onHistoricVideoChanged.emit(liveObj, pos); } + + if(isUserAction) { + StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { + if(StateSync.instance.hasAtLeastOneOnlineDevice()) { + Logger.i(TAG, "SyncHistory playback broadcasted (${liveObj.name}: ${position})"); + StateSync.instance.broadcastJson( + GJSyncOpcodes.syncHistory, + listOf(historyVideo) + ); + } + }; + } return positionBefore; } - return positionBefore; } + + fun getRecentHistory(minDate: OffsetDateTime, max: Int = 1000): List<HistoryVideo> { + val pager = getHistoryPager(); + val videos = pager.getResults().filter { it.date > minDate }.toMutableList(); + while(pager.hasMorePages() && videos.size < max) { + pager.nextPage(); + val newResults = pager.getResults(); + var foundEnd = false; + for(item in newResults) { + if(item.date < minDate) { + foundEnd = true; + break; + } + else + videos.add(item); + } + if(foundEnd) + break; + } + return videos; + } + fun getHistoryPager(): IPager<HistoryVideo> { return _historyDBStore.getObjectPager(); } diff --git a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt index e0753dc5..a8b76152 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt @@ -20,6 +20,8 @@ import com.futo.platformplayer.stores.FragmentedStorage import com.futo.platformplayer.stores.StringStringMapStorage import com.futo.platformplayer.stores.StringArrayStorage import com.futo.platformplayer.stores.StringStorage +import com.futo.platformplayer.stores.StringTMapStorage +import com.futo.platformplayer.sync.SyncSessionData import com.futo.platformplayer.sync.internal.GJSyncOpcodes import com.futo.platformplayer.sync.internal.SyncDeviceInfo import com.futo.platformplayer.sync.internal.SyncKeyPair @@ -44,6 +46,7 @@ class StateSync { private val _authorizedDevices = FragmentedStorage.get<StringArrayStorage>("authorized_devices") private val _syncKeyPair = FragmentedStorage.get<StringStorage>("sync_key_pair") private val _lastAddressStorage = FragmentedStorage.get<StringStringMapStorage>("sync_last_address_storage") + private val _syncSessionData = FragmentedStorage.get<StringTMapStorage<SyncSessionData>>("syncSessionData") private var _serverSocket: ServerSocket? = null private var _thread: Thread? = null @@ -190,6 +193,16 @@ class StateSync { }; } + fun getSyncSessionData(key: String): SyncSessionData { + return _syncSessionData.get(key) ?: SyncSessionData(key); + } + fun getSyncSessionDataString(key: String): String { + return Json.encodeToString(getSyncSessionData(key)); + } + fun saveSyncSessionData(data: SyncSessionData){ + _syncSessionData.setAndSave(data.publicKey, data); + } + private fun handleServiceUpdated(services: List<DnsService>) { if (!Settings.instance.synchronization.connectDiscovered) { return @@ -343,6 +356,9 @@ class StateSync { }) } + inline fun <reified T> broadcastJson(opcode: UByte, data: T) { + broadcast(opcode, Json.encodeToString(data)); + } fun broadcast(opcode: UByte, data: String) { broadcast(opcode, data.toByteArray(Charsets.UTF_8)); } @@ -363,7 +379,7 @@ class StateSync { val time = measureTimeMillis { //val export = StateBackup.export(); //session.send(GJSyncOpcodes.syncExport, export.asZip()); - session.send(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString()); + session.send(GJSyncOpcodes.syncStateExchange, getSyncSessionDataString(session.remotePublicKey)); } Logger.i(TAG, "Generated and sent sync export in ${time}ms"); } @@ -398,6 +414,11 @@ class StateSync { return _authorizedDevices.values.isNotEmpty() } } + fun hasAtLeastOneOnlineDevice(): Boolean { + synchronized(_sessions) { + return _sessions.any{ it.value.connected && it.value.isAuthorized }; + } + } fun getAll(): List<String> { synchronized(_authorizedDevices) { diff --git a/app/src/main/java/com/futo/platformplayer/stores/StringTMapStorage.kt b/app/src/main/java/com/futo/platformplayer/stores/StringTMapStorage.kt new file mode 100644 index 00000000..8216978c --- /dev/null +++ b/app/src/main/java/com/futo/platformplayer/stores/StringTMapStorage.kt @@ -0,0 +1,29 @@ +package com.futo.platformplayer.stores + +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json + +@kotlinx.serialization.Serializable +class StringTMapStorage<T> : FragmentedStorageFileJson() { + var map: HashMap<String, T> = hashMapOf() + + override fun encode(): String { + return Json.encodeToString(this) + } + + fun get(key: String): T? { + return map[key] + } + + fun setAndSave(key: String, value: T): T { + map[key] = value + save() + return value + } + + fun setAndSaveBlocking(key: String, value: T): T { + map[key] = value + saveBlocking() + return value + } +} diff --git a/app/src/main/java/com/futo/platformplayer/sync/GJSyncOpcodes.kt b/app/src/main/java/com/futo/platformplayer/sync/GJSyncOpcodes.kt index 469362a6..a6cef5d4 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/GJSyncOpcodes.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/GJSyncOpcodes.kt @@ -4,8 +4,12 @@ class GJSyncOpcodes { companion object { val sendToDevices: UByte = 101.toUByte(); + val syncStateExchange: UByte = 150.toUByte(); + val syncExport: UByte = 201.toUByte(); + val syncSubscriptions: UByte = 202.toUByte(); + val syncHistory: UByte = 203.toUByte(); } } \ No newline at end of file diff --git a/app/src/main/java/com/futo/platformplayer/sync/SyncSessionData.kt b/app/src/main/java/com/futo/platformplayer/sync/SyncSessionData.kt new file mode 100644 index 00000000..71d2ba3e --- /dev/null +++ b/app/src/main/java/com/futo/platformplayer/sync/SyncSessionData.kt @@ -0,0 +1,13 @@ +package com.futo.platformplayer.sync + +import com.futo.platformplayer.serializers.OffsetDateTimeSerializer +import kotlinx.serialization.Serializable +import java.time.OffsetDateTime + +@Serializable +class SyncSessionData(var publicKey: String) { + @kotlinx.serialization.Serializable(with = OffsetDateTimeSerializer::class) + var lastHistory: OffsetDateTime = OffsetDateTime.MIN; + @kotlinx.serialization.Serializable(with = OffsetDateTimeSerializer::class) + var lastSubscription: OffsetDateTime = OffsetDateTime.MIN; +} \ No newline at end of file diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt index b0b54df8..9f6b77f5 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt @@ -4,19 +4,25 @@ import com.futo.platformplayer.UIDialogs import com.futo.platformplayer.activities.MainActivity import com.futo.platformplayer.api.media.Serializer import com.futo.platformplayer.logging.Logger +import com.futo.platformplayer.models.HistoryVideo import com.futo.platformplayer.models.Subscription import com.futo.platformplayer.states.StateApp import com.futo.platformplayer.states.StateBackup +import com.futo.platformplayer.states.StateHistory import com.futo.platformplayer.states.StatePlayer import com.futo.platformplayer.states.StateSubscriptions +import com.futo.platformplayer.states.StateSync +import com.futo.platformplayer.sync.SyncSessionData import com.futo.platformplayer.sync.internal.SyncSocketSession.Opcode import com.futo.platformplayer.sync.models.SendToDevicePackage import com.futo.platformplayer.sync.models.SyncSubscriptionsPackage import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch +import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import java.io.ByteArrayInputStream import java.nio.ByteBuffer +import java.time.OffsetDateTime interface IAuthorizable { val isAuthorized: Boolean @@ -142,6 +148,22 @@ class SyncSession : IAuthorizable { }; } + GJSyncOpcodes.syncStateExchange -> { + val dataBody = ByteArray(data.remaining()); + data.get(dataBody); + val json = String(dataBody, Charsets.UTF_8); + val syncSessionData = Serializer.json.decodeFromString<SyncSessionData>(json); + + Logger.i(TAG, "Received SyncSessionData from " + remotePublicKey); + + + send(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString()); + + val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory); + if(recentHistory.size > 0) + sendJson(GJSyncOpcodes.syncHistory, recentHistory); + } + GJSyncOpcodes.syncExport -> { val dataBody = ByteArray(data.remaining()); val bytesStr = ByteArrayInputStream(data.array(), data.position(), data.remaining()); @@ -173,6 +195,39 @@ class SyncSession : IAuthorizable { val json = String(dataBody, Charsets.UTF_8); val subPackage = Serializer.json.decodeFromString<SyncSubscriptionsPackage>(json); handleSyncSubscriptionPackage(this, subPackage); + + val newestSub = subPackage.subscriptions.maxOf { it.creationTime }; + + val sesData = StateSync.instance.getSyncSessionData(remotePublicKey); + if(newestSub > sesData.lastSubscription) { + sesData.lastSubscription = newestSub; + StateSync.instance.saveSyncSessionData(sesData); + } + } + + GJSyncOpcodes.syncHistory -> { + val dataBody = ByteArray(data.remaining()); + data.get(dataBody); + val json = String(dataBody, Charsets.UTF_8); + val history = Serializer.json.decodeFromString<List<HistoryVideo>>(json); + Logger.i(TAG, "SyncHistory received ${history.size} videos from ${remotePublicKey}"); + + var lastHistory = OffsetDateTime.MIN; + for(video in history){ + val hist = StateHistory.instance.getHistoryByVideo(video.video, true, video.date); + if(hist != null) + StateHistory.instance.updateHistoryPosition(video.video, hist, true, video.position, video.date) + if(lastHistory < video.date) + lastHistory = video.date; + } + + if(lastHistory != OffsetDateTime.MIN && history.size > 1) { + val sesData = StateSync.instance.getSyncSessionData(remotePublicKey); + if (lastHistory > sesData.lastHistory) { + sesData.lastHistory = lastHistory; + StateSync.instance.saveSyncSessionData(sesData); + } + } } } } @@ -214,6 +269,9 @@ class SyncSession : IAuthorizable { } + inline fun <reified T> sendJson(opcode: UByte, data: T) { + send(opcode, Json.encodeToString<T>(data)); + } fun send(opcode: UByte, data: String) { send(opcode, data.toByteArray(Charsets.UTF_8)); } -- GitLab