From 241f8a41221272733ef4a2aa9bf34d4fb8624a00 Mon Sep 17 00:00:00 2001 From: Koen <koen@pop-os.localdomain> Date: Wed, 21 Jun 2023 12:51:49 +0200 Subject: [PATCH] Ensure synchronized ingestion and publishing. --- .../futo/polycentric/core/ProcessHandle.kt | 40 ++++++++++--------- .../futo/polycentric/core/Synchronization.kt | 12 ++++-- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/app/src/main/java/com/futo/polycentric/core/ProcessHandle.kt b/app/src/main/java/com/futo/polycentric/core/ProcessHandle.kt index 257c6a5..d40ceb0 100644 --- a/app/src/main/java/com/futo/polycentric/core/ProcessHandle.kt +++ b/app/src/main/java/com/futo/polycentric/core/ProcessHandle.kt @@ -202,25 +202,27 @@ class ProcessHandle constructor( } private fun publish(contentType: Long, content: ByteArray, lwwElementSet: LWWElementSet?, lwwElement: LWWElement?, references: MutableList<Protocol.Reference>): Pointer { - val processState = Store.instance.getProcessState(system, processSecret.process) - val event = Event( - system, - processSecret.process, - processState.logicalClock + 1L, - contentType, - content, - Protocol.VectorClock.newBuilder().addAllLogicalClocks(listOf()).build(), - lwwElementSet, - lwwElement, - references, - processState.indices!!.toProto(), - unixMilliseconds = System.currentTimeMillis() - ) - - val eventBuffer = event.toProto().toByteArray() - val signedEvent = SignedEvent(processSecret.system.sign(eventBuffer), eventBuffer) - - return ingest(signedEvent) + synchronized(this) { + val processState = Store.instance.getProcessState(system, processSecret.process) + val event = Event( + system, + processSecret.process, + processState.logicalClock + 1L, + contentType, + content, + Protocol.VectorClock.newBuilder().addAllLogicalClocks(listOf()).build(), + lwwElementSet, + lwwElement, + references, + processState.indices!!.toProto(), + unixMilliseconds = System.currentTimeMillis() + ) + + val eventBuffer = event.toProto().toByteArray() + val signedEvent = SignedEvent(processSecret.system.sign(eventBuffer), eventBuffer) + + return ingest(signedEvent) + } } fun ingest(signedEvent: SignedEvent): Pointer { diff --git a/app/src/main/java/com/futo/polycentric/core/Synchronization.kt b/app/src/main/java/com/futo/polycentric/core/Synchronization.kt index db0b3d8..dab6c4b 100644 --- a/app/src/main/java/com/futo/polycentric/core/Synchronization.kt +++ b/app/src/main/java/com/futo/polycentric/core/Synchronization.kt @@ -27,8 +27,10 @@ class Synchronization { } private fun ingest(processHandle: ProcessHandle, events: Protocol.Events) { - for (rawEvent in events.eventsList) { - processHandle.ingest(SignedEvent.fromProto(rawEvent)) + synchronized(processHandle) { + for (rawEvent in events.eventsList) { + processHandle.ingest(SignedEvent.fromProto(rawEvent)) + } } } @@ -72,8 +74,10 @@ class Synchronization { } fun saveBatch(processHandle: ProcessHandle, events: Protocol.Events) { - for (e in events.eventsList) { - processHandle.ingest(SignedEvent.fromProto(e)) + synchronized(processHandle) { + for (e in events.eventsList) { + processHandle.ingest(SignedEvent.fromProto(e)) + } } } -- GitLab