Building trust free apps in a hostile world

Me

  • Michael Shaw
  • 50% of Cosmic Teapot
  • 7 years of Scala
  • Scala, Rust, Swift

Problem

Shipping Files

Shipping Events

Real World (tm)

Reflection

I'm an app developer

Apps need to synchronize data

I don't want to admin servers

I care about user privacy

Problem

Machines sharing a single dataset

Always writable, not always online

Adding a device needs to be fast for 50-300k rows

Cross-platform would be nice


Maybe one "nines"

Mobile devices go places

or they're just off

or out of credit

or using Telstra

We'll have to settle for
strong eventual consistency

and be Available under a Partition (see CAP theorem)

Let me tell you a story

You're a practical developer

You build as little and as fast as possible

You pick a Backend As A Service

 
 

Represent your local rows 1-1 in your remote store

Cry when you instantly run out of quota

They're batching your writes, but charging your quota with 1 request per row

Treating initial synchronization like it's a DDOS attack

Cry about how bad their APIs are

 

Cry when they shut down

Realize you don't have to use them anymore

Go outside and rethink your life choices

Starting over

Batching

1 Local row : 1 Remote row won't work

Congratulate yourself
on inventing log shipping

Event Sourcing
if you're a consultant

Shipping Files

Simple Distributed File Store

case class File(
  name:UUID,
  clientId:UUID,
  localCreatedAt:LocalTime,
  data:Array[Byte]
)
case class FileUploaded(name:UUID, serverCreatedAt:ServerTime)

trait FileServer {
  def getSince(time:ServerTime, n:Int) : [File]
  def put(file:File) : FileUploaded
  def delete(name:UUID) : Unit // Chekhov's delete
  • Upload local files that aren't on the server
  • Save a FileUploaded record when you upload/download a file
  • Download new files in ServerTime ascending order for an easier delta

Property test for convergence

property("clients converge") = forAll {
    (client:List[Client]) =>
  val timeAllowed = 5.seconds
  val server = new FileServer()

  def clientsConverged : Bool = {
    clients.zip(clients.drop(1)).forall((l,r) => l.files == r.files)
  }

  while (timeElapsed() < timeAllowed && !clientsConverged) {
    for (client <- clients) {
      for {
        file <- client.unuploadedFiles.headOption
      } {
        client.upload(file, server)
      }
      client.downloadNewFilesFrom(server)
    }
  }

  clientsConverged
}

Improving our tests

  • Run clients in seperate threads with seperate
    server resources
  • Add configurable simulated error rates + latency
  • Test against remote backends

Practical Property-Based Testing - Charles O'Farrell

Property Testing Part 1 Part 2

What do we have?

An eventually consistent distributed file store

All living clients will eventually agree upon a set of files

This forms our bedrock layer of synchronization

Only create and delete (no update)

What are we missing?

The ability to do anything but share binary files

We can't even update our binary files

Shipping Events

Events

case class Person(name:String, age:Int)

case class Event[T](
  id:UUID,
  localCreatedAt: LocalTime,
  row:T,
  removed:Bool
)

"removed" denotes deletion, a tombstone

Serialize these in to our files

App Structure

Thanks to our distributed file store we now have an eventually consistent event set :)

 

That don't arrive in a uniform order across clients :(

Derived State Machine

val peopleTable : Map[UUID, Event[Person]]

We apply rows to our state machine as we get them

def apply(db:Map[UUID, Event[Person]],
          event:Event[Person]) : Map[UUID,Event[Person]]

If we're working locally, intuitively we just replace the old event the new one

But what about events from other clients?

Which #123 do we keep?

Resolution

Always replace the existing one? Clients diverge

We need a merge function

def merge(a:Event[Person], b:Event[Person]) : Event[Person]

Merge must allow

  • Reordering of events (commutivity)
    x • y = y • x
  • Different batching/grouping (associativity)
    ( x • y ) • z = x • ( y • z )
  • Duplicate events (idempotence)
    x • x • x = x

All clients must run identical merge functions on local and remote events alike to guarantee convergence.

Binary Functions

Not + - / * or `concat` as they're not idempotent

  • Max

    max(3, max(1, 2)) == max(1, max(1,max(2,3)))

  • Set Addition

    Set(1,2) ++ Set(1,3) == Set(3) ++ Set(1,2) ++ Set(2,3)

Last Writer Wins

Max applied to events

Establish a Total Ordering amongst events

Driven by some form of local time as we must allow immediate writes

case class Event[T](
  id:String,
  localCreatedAt: Event.LocalTime,
  row:T,
  removed:Boolean
)

implicit val personOrdering : Ordering[Person] = Ordering.by { person =>
  (person.name, person.age)
}

val localTimeOnlyOrdering:Ordering[Event[Person]] = Ordering.by { ev =>
  (ev.id, ev.localCreatedAt, ev.row)
}

Using Our Total Ordering

type MergeEvent[T] = (Event[T], Event[T]) => Event[T]

def merge[T](ordering: Ordering[Event[T]]) : MergeF[T] = {
  { (a: Event[T], b:Event[T]) =>
    if (ordering.gt(a,b)) a else b
  }
}

def sample() {
  val mergeF : MergeEvent[Person] = merge(localTimeOnlyOrdering)
  val ev = mergeF(
    Event("123", 12, Person("steve", 25), false),
    Event("123", 14, Person("steve", 26), false)
  )
}

// Event(123,14,Person(steve,26),false)

Extending merge to maps

type EventMap[T] = Map[String, Event[T]]
type MergeMap[T] = (EventMap[T], EventMap[T]) => EventMap[T]

def mergeMap[T](me:MergeEvent[T]) : MergeMap[T] = {
  { (a: EventMap[T], b:EventMap[T]) =>
    a ++ b.map { case (bk,bv) =>
      bk -> a.get(bk).map(av => me(av, bv)).getOrElse(bv)
    }
  }
}

def sampleMap() {
  val mergeMapF = mergeMap(merge(localTimeOnlyOrdering))
  val a = Map(
    "123" -> Event("123", 12, Person("steve", 25), false),
    "456" -> Event("456", 13, Person("eve", 27), false)
  )
  val b = Map("123" -> Event("123", 14, Person("steve", 26), false))
  val db = mergeMapF(a, b)
}
// Map(
//   123 -> Event(123,14,Person(steve,26),false),
//   456 -> Event(456,13,Person(eve,27),false)
// )

Revisiting Conflicts

Testing Merge functions

property("merge function converges") = forAll {
        (events:List[Event[Person]], seed:Int) =>
  val permutations = 12
  // permutations of a list of batches
  val permutatedEventBatches = for(i <- 0 until permutations) yield {
    val withDupes = dupe(events, seed) // idempotence
    val reordered = shuffle(events, seed) // commutativity
    inRandomBatches(reordered, seed) // associativity
  }
  val results = permutatedEventBatches.map { batches =>
    batches.foldLeft(Map.empty) { (accu, e) merge(accu, e)}
  }
  allEqual(results)
}

Congratulations, you've invented CRDTs

Conflict-free Replicated Data Types

Merge functions and data model choices drastically affect system behaviour

We're looking at a small subset, just enough to represent a "traditional" independent row based system

CRDTs: Consistency without concurrency control

Ship It?

Last Writer Wins on local time is very limiting

The described system is only moderately useful

Real World (tm)

Latency & Performance

Users expect changes to go out instantly to other clients

This involves creating & uploading a new file

100 items a day * 2 years = 73k rows = 73k files

Downloading batches of 200 files at 1 fetch per 2 seconds = 12 minute sync

Almost reasonable for a desktop app

Death for a mobile app

Compaction

We can merge files to reduce the file count

We're allowed to do this since our merge function allows arbitrary grouping (associativity)

The server can compact these rows

Merge, but for files

Wait

I like to sleep at night

I don't want to run more servers

Client side compaction

Clients have the entire data set

We need to keep our file count low enough to facilitate a fast initial sync when adding a client

  • Always compact unuploaded files before uploading
  • Compact files only when threshold reached

Generational Compaction

New files given generation 0

Compacted files are given generation:
max(generation of superseded files) + 1

Sort files by generation, take 50th percentile (50-80 is fine)

Compact all files of that generation and lower

Post compaction

The new file is given a new id and will be uploaded

It contains a record of superseded files

We can then safely delete superseded files locally and remotely

Upper Threshold

Max Files Allowed = (Max allowed sync time)
/ (Time per request)
* (Files per request)

Accidental Simultaneous Compaction

Duplicates are fine, our merge function is idempotent

Duplication becomes a performance optimization, not a correctness issue

Can be made extremely rare

Dynamic Threshold

Sort recently seen client ids, evenly space on continuum

Download before upload so we know what clients are around

A client's location is it's threshold

Time Revisited

Local Time Sucks

"Local" Lamport Clocks

  1. Increment "time" before any event
  2. Include "time" in any uploaded file
  3. When downloading a file, set "time" to max(our_time, message_time)

Total Ordering (even where it doesn't exist)

Monitonically increasing

Locally, new rows > old rows

Offline clients

"Time" counter increments fastest on clients doing the most writes

This can have fairness drawbacks

Instead, when setting time, set it to:
max(desired_value, current_local_time())

Monitonically increasing "local time"

Server Time

case class Event[T](
  id:UUID,
  localCreatedAt: LocalTime,
  serverCreatedAt: Option[ServerTime],
  row:T,
  removed:Bool
)

Neutral, fairer observer between clients

Server Time Alone Sucks

How do we apply local rows before we know the server time?

Apply twice

1. Initially with local time only

2. After upload, reapply it again, with server time

Min Server Time

case class Event[T](
  id:UUID,
  localCreatedAt: LocalTime,
  minServerCreatedAt: ServerTime,
  row:T,
  removed:Bool
)

Set to max local minServerCreatedAt observed

Allows unuploaded rows to win the local merge

After upload set:

event.minServerCreatedAt = max(
  file.serverCreatedAt,
  event.minServerCreatedAt
)

Forcing server monotonicity

Best of both worlds?

Compute a derived boolean 'lost' based on if server time is much later than local time

Order these 'lost' events very low

def lost[T](event:Event[T]) : Boolean = {
  event.minServerCreatedAt > event.localCreatedAt + lostPeriod
}
val lostOrdering : Ordering[Event[Person]] = Ordering.by { ev =>
  (ev.id, !lost(ev), ev.minServerCreatedAt, ev.localCreatedAt, ev.row)
}

Careful! A local only row can never beat the same row after being uploaded to the server

This function would cause divergence

Testing merge functions for convergence is easy

Reproducing real world experiences is hard

Merge has tunnel vision

Only works within a single id, what about:

  • Foreign keys
  • Uniqueness constraints

A given set of events must be merged deterministically

Nobody said you couldn't generate more events in response though

Outside of our safe borders

  • Protecting deleted parents in an FK relationship through restamping the parent if a remote client deletes it
  • Prompting the user to help resolve uniqueness constraints (or take newer)
  • Writing superseded rows off to the "side" and allow the user to revive/restamp them

Use at your own peril

Never violate the laws of your merge function

Aim simple & test thoroughly

Outside of mobile

CRDTs are well suited to collaboration on human readable things

Like shared documents

What do we have?

+ Minimalist file server
(New BAAS client in a few days of work)

What was the title of the talk again?

"Trust free computing in a hostile world"

Files, Files, Files

gzip

Client Side Encryption

Privacy

We don't trust HTTP transport

Why would you trust storage?

You can't lose/leak what you can't understand

The server component remains a replaceable + ignorant component in your architecture

Sleeping developers, happy users

Thanks for listening