Problem
Shipping Files
Shipping Events
Real World (tm)
Reflection
I'm an app developer
Apps need to synchronize data
I don't want to admin servers
I care about user privacy
Machines sharing a single dataset
Always writable, not always online
Adding a device needs to be fast for 50-300k rows
Cross-platform would be nice
Mobile devices go places
or they're just off
or out of credit
or using Telstra
We'll have to settle for
strong eventual consistency
and be Available under a Partition (see CAP theorem)
You're a practical developer
You build as little and as fast as possible
You pick a Backend As A Service
Represent your local rows 1-1 in your remote store
Cry when you instantly run out of quota
They're batching your writes, but charging your quota with 1 request per row
Treating initial synchronization like it's a DDOS attack
Cry about how bad their APIs are
Cry when they shut down
Realize you don't have to use them anymore
Go outside and rethink your life choices
1 Local row : 1 Remote row won't work
case class File(
name:UUID,
clientId:UUID,
localCreatedAt:LocalTime,
data:Array[Byte]
)
case class FileUploaded(name:UUID, serverCreatedAt:ServerTime)
trait FileServer {
def getSince(time:ServerTime, n:Int) : [File]
def put(file:File) : FileUploaded
def delete(name:UUID) : Unit // Chekhov's delete
property("clients converge") = forAll {
(client:List[Client]) =>
val timeAllowed = 5.seconds
val server = new FileServer()
def clientsConverged : Bool = {
clients.zip(clients.drop(1)).forall((l,r) => l.files == r.files)
}
while (timeElapsed() < timeAllowed && !clientsConverged) {
for (client <- clients) {
for {
file <- client.unuploadedFiles.headOption
} {
client.upload(file, server)
}
client.downloadNewFilesFrom(server)
}
}
clientsConverged
}
An eventually consistent distributed file store
All living clients will eventually agree upon a set of files
This forms our bedrock layer of synchronization
Only create and delete (no update)
The ability to do anything but share binary files
We can't even update our binary files
case class Person(name:String, age:Int)
case class Event[T](
id:UUID,
localCreatedAt: LocalTime,
row:T,
removed:Bool
)
"removed" denotes deletion, a tombstone
Serialize these in to our files
Thanks to our distributed file store we now have an eventually consistent event set :)
That don't arrive in a uniform order across clients :(
val peopleTable : Map[UUID, Event[Person]]
We apply rows to our state machine as we get them
def apply(db:Map[UUID, Event[Person]],
event:Event[Person]) : Map[UUID,Event[Person]]
If we're working locally, intuitively we just replace the old event the new one
But what about events from other clients?
Which #123 do we keep?
Always replace the existing one? Clients diverge
We need a merge function
def merge(a:Event[Person], b:Event[Person]) : Event[Person]
All clients must run identical merge functions on local and remote events alike to guarantee convergence.
Not + - / * or `concat` as they're not idempotent
max(3, max(1, 2)) == max(1, max(1,max(2,3)))
Set(1,2) ++ Set(1,3) == Set(3) ++ Set(1,2) ++ Set(2,3)
Max applied to events
Establish a Total Ordering amongst events
Driven by some form of local time as we must allow immediate writes
case class Event[T](
id:String,
localCreatedAt: Event.LocalTime,
row:T,
removed:Boolean
)
implicit val personOrdering : Ordering[Person] = Ordering.by { person =>
(person.name, person.age)
}
val localTimeOnlyOrdering:Ordering[Event[Person]] = Ordering.by { ev =>
(ev.id, ev.localCreatedAt, ev.row)
}
Using Our Total Ordering
type MergeEvent[T] = (Event[T], Event[T]) => Event[T]
def merge[T](ordering: Ordering[Event[T]]) : MergeF[T] = {
{ (a: Event[T], b:Event[T]) =>
if (ordering.gt(a,b)) a else b
}
}
def sample() {
val mergeF : MergeEvent[Person] = merge(localTimeOnlyOrdering)
val ev = mergeF(
Event("123", 12, Person("steve", 25), false),
Event("123", 14, Person("steve", 26), false)
)
}
// Event(123,14,Person(steve,26),false)
Extending merge to maps
type EventMap[T] = Map[String, Event[T]]
type MergeMap[T] = (EventMap[T], EventMap[T]) => EventMap[T]
def mergeMap[T](me:MergeEvent[T]) : MergeMap[T] = {
{ (a: EventMap[T], b:EventMap[T]) =>
a ++ b.map { case (bk,bv) =>
bk -> a.get(bk).map(av => me(av, bv)).getOrElse(bv)
}
}
}
def sampleMap() {
val mergeMapF = mergeMap(merge(localTimeOnlyOrdering))
val a = Map(
"123" -> Event("123", 12, Person("steve", 25), false),
"456" -> Event("456", 13, Person("eve", 27), false)
)
val b = Map("123" -> Event("123", 14, Person("steve", 26), false))
val db = mergeMapF(a, b)
}
// Map(
// 123 -> Event(123,14,Person(steve,26),false),
// 456 -> Event(456,13,Person(eve,27),false)
// )
property("merge function converges") = forAll {
(events:List[Event[Person]], seed:Int) =>
val permutations = 12
// permutations of a list of batches
val permutatedEventBatches = for(i <- 0 until permutations) yield {
val withDupes = dupe(events, seed) // idempotence
val reordered = shuffle(events, seed) // commutativity
inRandomBatches(reordered, seed) // associativity
}
val results = permutatedEventBatches.map { batches =>
batches.foldLeft(Map.empty) { (accu, e) merge(accu, e)}
}
allEqual(results)
}
Conflict-free Replicated Data Types
Merge functions and data model choices drastically affect system behaviour
We're looking at a small subset, just enough to represent a "traditional" independent row based system
CRDTs: Consistency without concurrency controlLast Writer Wins on local time is very limiting
The described system is only moderately useful
Users expect changes to go out instantly to other clients
This involves creating & uploading a new file
100 items a day * 2 years = 73k rows = 73k files
Downloading batches of 200 files at 1 fetch per 2 seconds = 12 minute sync
Almost reasonable for a desktop app
Death for a mobile app
We can merge files to reduce the file count
We're allowed to do this since our merge function allows arbitrary grouping (associativity)
The server can compact these rows
Merge, but for files
I like to sleep at night
I don't want to run more servers
Clients have the entire data set
We need to keep our file count low enough to facilitate a fast initial sync when adding a client
New files given generation 0
Compacted files are given generation:
max(generation of superseded files) + 1
Sort files by generation, take 50th percentile (50-80 is fine)
Compact all files of that generation and lower
The new file is given a new id and will be uploaded
It contains a record of superseded files
We can then safely delete superseded files locally and remotely
Max Files Allowed = (Max allowed sync time)
/ (Time per request)
* (Files per request)
Duplicates are fine, our merge function is idempotent
Duplication becomes a performance optimization, not a correctness issue
Can be made extremely rare
Sort recently seen client ids, evenly space on continuum
Download before upload so we know what clients are around
A client's location is it's threshold
Total Ordering (even where it doesn't exist)
Monitonically increasing
Locally, new rows > old rows
"Time" counter increments fastest on clients doing the most writes
This can have fairness drawbacks
Instead, when setting time, set it to:
max(desired_value, current_local_time())
Monitonically increasing "local time"
case class Event[T](
id:UUID,
localCreatedAt: LocalTime,
serverCreatedAt: Option[ServerTime],
row:T,
removed:Bool
)
Neutral, fairer observer between clients
How do we apply local rows before we know the server time?
1. Initially with local time only
2. After upload, reapply it again, with server time
case class Event[T](
id:UUID,
localCreatedAt: LocalTime,
minServerCreatedAt: ServerTime,
row:T,
removed:Bool
)
Set to max local minServerCreatedAt observed
Allows unuploaded rows to win the local merge
After upload set:
event.minServerCreatedAt = max(
file.serverCreatedAt,
event.minServerCreatedAt
)
Forcing server monotonicity
Compute a derived boolean 'lost' based on if server time is much later than local time
Order these 'lost' events very low
def lost[T](event:Event[T]) : Boolean = {
event.minServerCreatedAt > event.localCreatedAt + lostPeriod
}
val lostOrdering : Ordering[Event[Person]] = Ordering.by { ev =>
(ev.id, !lost(ev), ev.minServerCreatedAt, ev.localCreatedAt, ev.row)
}
Careful! A local only row can never beat the same row after being uploaded to the server
This function would cause divergence
Testing merge functions for convergence is easy
Reproducing real world experiences is hard
Only works within a single id, what about:
A given set of events must be merged deterministically
Nobody said you couldn't generate more events in response though
Never violate the laws of your merge function
Aim simple & test thoroughly
CRDTs are well suited to collaboration on human readable things
Like shared documents
+ Minimalist file server
(New BAAS client in a few days of work)
"Trust free computing in a hostile world"
gzip
Client Side Encryption
We don't trust HTTP transport
Why would you trust storage?
You can't lose/leak what you can't understand
The server component remains a replaceable + ignorant component in your architecture
Sleeping developers, happy users
Thanks for listening