Scalatra for Double Record Accounting

Scala May 26, 2019

In this post, we will explore an alternative database schema design for tracking physical inventory. Our previous posts focused on using a single database record to model a distinct location and SKU (location and SKU being represented as basic strings in this example). But what would it look like if we designed our schema around the idea of double-entry accounting? That would mean the quantity of a given location and SKU would be defined by the sum of multiple database records (we couldn't just look at a single record anymore to find its current quantity).

A good primer for double-entry accounting is Martin Fowler's blog post on the subject https://www.martinfowler.com/eaaDev/AccountingNarrative.html.

Previous Blog Posts In This Series

This post will significantly reference several of my previous blog posts. We now seek to draw a comparison on this double-entry schema to a design that uses a single record (for each location and SKU relationship). I would venture to guess that most developers would not start with a double-entry model.

Source Code

Designing the Schema for Double-Entry

Each transfer (physical movement of goods from one location to another) will be modeled with two new records, one decrementing the qty from the source location and one incrementing the qty for the destination.

Starting with a single location LOC-01 that contains 2 units of SKU-01

id location sku qty
1 LOC-01 SKU-01 2

If we wanted to move 1 unit of SKU-01 to the location LOC-02, instead of updating record id:1 we would create two new records.

id location sku qty
1 LOC-01 SKU-01 2
2 LOC-01 SKU-01 -1
3 LOC-02 SKU-01 1

This means that to calculate the current quantity of LOC-01 and SKU-01 we would need to sum the quantity across records id:1 and id:2.

Using this design, we are never updating old database records, we are only creating new ones.

All our operations (creating, moving, and decrementing inventory) would be handled by a DB insert. Because we treat existing records as immutable, there are fewer instances where the DB needs to manage the overhead of locking specific rows. We saw in our previous post how we ended up doing some fine-grained row-level locking in order to provide consistency. We ended up locking everything in the transaction, the source and the destination, which can have an impact on all the other database queries that might be running against that table.

Just like in our original example, let's start with a DAO and way to retrieve all the database records for testing.

package org.bitbucket.honstain.inventory.dao

import org.slf4j.{Logger, LoggerFactory}
import slick.jdbc.{PostgresProfile, TransactionIsolation}
import slick.jdbc.PostgresProfile.api._

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object TRANSACTION {
  val ADJUST = "adjust"
  val TRANSFER = "transfer"
}

case class InventoryDoubleRecord(
                                  id: Option[Int],
                                  sku: String,
                                  qty: Int,
                                  txnType: String,
                                  location: String
                                )

class InventoryDoubleRecords(tag: Tag) extends Table[InventoryDoubleRecord](tag, "inventory_double") {
  def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
  def sku = column[String]("sku")
  def qty = column[Int]("qty")
  def txnType = column[String]("type")
  def location = column[String]("location")
  def * =
    (id.?, sku, qty, txnType, location) <> (InventoryDoubleRecord.tupled, InventoryDoubleRecord.unapply)
}

object InventoryDoubleRecordDao extends TableQuery(new InventoryDoubleRecords(_)) {

  val logger: Logger = LoggerFactory.getLogger(getClass)

  def findAll(db: PostgresProfile.backend.DatabaseDef): Future[Seq[InventoryDoubleRecord]] = {
    db.run(this.result)
  }

  def findAll(db: PostgresProfile.backend.DatabaseDef): Future[Seq[(String, String, Option[Int])]] = {
    val groupByQuery = this.groupBy(x => (x.sku, x.location))
      .map{ case ((sku, location), group) => (sku, location, group.map(_.qty).sum) }
      .result
    db.run(groupByQuery)
  }
}

We will also make a basic test to start us off.

package org.bitbucket.honstain.inventory.dao

import org.bitbucket.honstain.PostgresSpec

import org.scalatest.BeforeAndAfter
import org.scalatra.test.scalatest._
import slick.dbio.DBIO
import slick.jdbc.PostgresProfile.api._

import scala.concurrent.Await
import scala.concurrent.duration.Duration


class InventoryDoubleRecordDaoTests extends ScalatraFunSuite with BeforeAndAfter with PostgresSpec {

  def createInventoryTable: DBIO[Int] =
    sqlu"""
          CREATE TABLE inventory_double
          (
            id bigserial NOT NULL,
            sku text,
            qty integer,
            type text,
            location text,
            CONSTRAINT pk_double PRIMARY KEY (id)
          );
      """
  def dropInventoryTable: DBIO[Int] =
    sqlu"""
          DROP TABLE IF EXISTS inventory_double;
      """

  before {
    Await.result(database.run(createInventoryTable), Duration.Inf)
  }

  after {
    Await.result(database.run(dropInventoryTable), Duration.Inf)
  }

  val TEST_SKU = "NewSku"
  val BIN_01 = "Bin-01"
  val BIN_02 = "Bin-02"

  test("findAll when empty") {
    val futureFind = InventoryDoubleRecordDao.findAll(database)
    val findResult: Seq[InventoryDoubleRecord] = Await.result(futureFind, Duration.Inf)

    findResult should equal(List())
  }
  
    test("findAll with single location and SKU but multiple records") {
    val inventoryTable = TableQuery[InventoryDoubleRecords] ++= Seq(
      InventoryDoubleRecord(None, TEST_SKU, 1, TRANSACTION.ADJUST, BIN_01),
      InventoryDoubleRecord(None, TEST_SKU, 3, TRANSACTION.ADJUST, BIN_01),
      InventoryDoubleRecord(None, TEST_SKU, -1, TRANSACTION.ADJUST, BIN_01)
    )
    Await.result(database.run(inventoryTable), Duration.Inf)

    val futureFind = InventoryDoubleRecordDao.findAll(database)
    val findResult: Seq[(String, String, Option[Int])] = Await.result(futureFind, Duration.Inf)

    findResult should equal(List((TEST_SKU, BIN_01, Some(3))))
  }

  test("findAll with multiple location+SKU and multiple records") {
    val inventoryTable = TableQuery[InventoryDoubleRecords] ++= Seq(
      InventoryDoubleRecord(None, TEST_SKU, 1, TRANSACTION.ADJUST, BIN_01),
      InventoryDoubleRecord(None, TEST_SKU, 3, TRANSACTION.ADJUST, BIN_02),
      InventoryDoubleRecord(None, TEST_SKU, -1, TRANSACTION.ADJUST, BIN_01)
    )
    Await.result(database.run(inventoryTable), Duration.Inf)

    val futureFind = InventoryDoubleRecordDao.findAll(database)
    val findResult: Seq[(String, String, Option[Int])] = Await.result(futureFind, Duration.Inf)

    findResult should contain only ((TEST_SKU, BIN_02, Some(3)), (TEST_SKU, BIN_01, Some(0)))
  }
}

This should look very similar to our original example (that modeled each location and SKU relationship with a single DB record), except for the fact that we now do a SQL aggregation on the results to find all the current inventory information.

  def findAll(db: PostgresProfile.backend.DatabaseDef): Future[Seq[(String, String, Option[Int])]] = {
    val groupByQuery = this.groupBy(x => (x.sku, x.location))
      .map{ case ((sku, location), group) => (sku, location, group.map(_.qty).sum) }
      .result
    db.run(groupByQuery)
  }

This Slick query is roughly equivalent to the following SQL:

SELECT sku, location, SUM(qty)
FROM inventory_double
GROUP BY sku, location

Why the SQL aggregation and tuple return type? How we model the record in the database with the class InventoryDoubleRecord is now somewhat abstracted from how our business logic might want to handle the date. The InventoryDoubleRecord class has id and txnType columns which are not necessarily applicable when a client is asking the service how many items are in a location or how many units of a SKU are available. Hence we are returning a tuple from our findAll function, the reader could map this to a new class if they preferred.

Create/Insert/Update Logic

Now we want the ability to create new inventory, and update the qty of an existing location + SKU. You might refer to this sort of logic as an adjustment (which I track via the TRANSACTION enum). Where previously we could take a lock on a single record and do an atomic create or update:

INSERT INTO inventory_single (sku, qty, location)
VALUES ('SKU-01', 3, 'LOC-01')
ON CONFLICT ON CONSTRAINT inventory_single_sku_location_key
DO UPDATE SET qty = EXCLUDED.qty;

Now we have to consider multiple rows in order to find out the current quantity of a location and SKU. Our initial function definition could look like:

  def create(db: PostgresProfile.backend.DatabaseDef,
             sku: String,
             qty: Int,
             location: String
            ): Future[InventoryDoubleRecord] = {
    val insert = for {
      // Find the current count for this Location and SKU
      initGroupBy <- // TODO - query to find existing qty

      // Insert a new record that will result in the desired qty
      _ <- // TODO - insert a new record

      // Return the updated value
      newRecordBySku <- // TODO return the current qty for the SKU and Location
    } yield newRecordBySku

    db.run(insert.transactionally)
  }

The structure we just outlined uses a Slick transaction and for comprehension to group a set of queries (lookup, insert, and lookup). We start first by doing the lookup component.

  • Lookup existing qty - find the qty for the Location/SKU pair if it exists
    • A basic Slick aggregartion with sum, will give us a result of something like Future[Seq[(String, String, Option[Int])]]. We should be dealing with just a single record here given our schema design so we can use headOption to grab the head of the Sequence.
this.filter(_.sku === sku).filter(_.location === location)
        .groupBy(x => x.sku)
        .map{ case (_, group) => (sku, group.map(_.qty).sum) }
        .result.headOption

  • Insert - create a new database record.
    • Case 1 - We found no existing records for the SKU and Location pair.
    • Case 2 - We have an existing record and the QTY is needed to determine what the new qty to insert will be.
      _ <- {
        initGroupBy match {
          case None =>
            this += InventoryDoubleRecord(Some(0), sku, qty, TRANSACTION.ADJUST, location)
          case Some((_, Some(existingQty))) =>
            logger.debug(s"FOUND record $qty - $existingQty")
            this += InventoryDoubleRecord(Some(0), sku, qty - existingQty, TRANSACTION.ADJUST, location)
          case _ => DBIO.failed(new Exception("Insert for create failed"))
        }
      }

  • Lookup current quantity - get the current qty for the Location/SKU pair after our insert.
    • I made an effort to use the return type InventoryDoubleRecord class so we could more easily draw a parallel with our single record schema design. It also illustrates an example of mapping the results (even I am not personally a fan of this code).
      newRecordGroupBy <- this.filter(_.sku === sku).filter(_.location === location)
        .groupBy(x => x.sku).map{ case (_, group) => (sku, group.map(_.qty).sum) }
        .result.headOption
      newRecordBySku <- {
        newRecordGroupBy match {
          case Some((_, Some(newQty))) =>
            DBIO.successful (InventoryDoubleRecord (None, sku, newQty, TRANSACTION.ADJUST, location) )
          case _ =>
            DBIO.failed(new Exception("Insert for create failed"))
        }
      }

We want some tests to cover this logic as well. These are not beautiful or fully comprehensive tests, but hopefully they are illustrative of the of logic we are testing in the DAO. They are a bit on the verbose side.

  test("create when empty") {
    val futureCreate = InventoryDoubleRecordDao.create(database, TEST_SKU, 2, BIN_01)
    val createResult = Await.result(futureCreate, Duration.Inf)
    createResult should equal(InventoryDoubleRecord(None, TEST_SKU, 2, TRANSACTION.ADJUST, BIN_01))

    val futureFind = InventoryDoubleRecordDao.findAllRaw(database)
    val findResult: Seq[InventoryDoubleRecord] = Await.result(futureFind, Duration.Inf)
    findResult should equal(List(InventoryDoubleRecord(Option(1), TEST_SKU, 2, TRANSACTION.ADJUST, BIN_01)))
  }

  test("create with existing record") {
    Await.result(InventoryDoubleRecordDao.create(database, TEST_SKU, 2, BIN_01), Duration.Inf)

    val futureUpdate = InventoryDoubleRecordDao.create(database, TEST_SKU, 1, BIN_01)
    val updateResult= Await.result(futureUpdate, Duration.Inf)
    updateResult should equal(InventoryDoubleRecord(None, TEST_SKU, 1, TRANSACTION.ADJUST, BIN_01))

    val futureFindAllRaw = InventoryDoubleRecordDao.findAllRaw(database)
    val findResult: Seq[InventoryDoubleRecord] = Await.result(futureFindAllRaw, Duration.Inf)
    findResult should equal(List(
      InventoryDoubleRecord(Option(1), TEST_SKU, 2, TRANSACTION.ADJUST, BIN_01),
      InventoryDoubleRecord(Option(2), TEST_SKU, -1, TRANSACTION.ADJUST, BIN_01),
    ))
  }

Final Create/Insert/Update Code

This gives us the following create function:

  def create(db: PostgresProfile.backend.DatabaseDef,
             sku: String,
             qty: Int,
             location: String
            ): Future[InventoryDoubleRecord] = {
    val insert = for {
      // Find the current count for this location and SKU
      initGroupBy <- this.filter(_.sku === sku).filter(_.location === location)
        .groupBy(x => x.sku).map{ case (_, group) => (sku, group.map(_.qty).sum) }
        .result.headOption

      // Insert a new record that will result in the desired qty
      _ <- {
        initGroupBy match {
          case None =>
            this += InventoryDoubleRecord(None, sku, qty, TRANSACTION.ADJUST, location)
          case Some((_, Some(existingQty))) =>
            logger.debug(s"FOUND record $qty - $existingQty")
            this += InventoryDoubleRecord(None, sku, qty - existingQty, TRANSACTION.ADJUST, location)
          case _ => DBIO.failed(new Exception("Insert for create failed"))
        }
      }

      // Return the updated value
      newRecordGroupBy <- this.filter(_.sku === sku).filter(_.location === location)
        .groupBy(x => x.sku).map{ case (_, group) => (sku, group.map(_.qty).sum) }
        .result.headOption
      newRecordBySku <- {
        newRecordGroupBy match {
          case Some((_, Some(newQty))) =>
            DBIO.successful (InventoryDoubleRecord (None, sku, newQty, TRANSACTION.ADJUST, location) )
          case _ =>
            DBIO.failed(new Exception("Insert for create failed"))
        }
      }
    } yield newRecordBySku

    db.run(insert.transactionally)
  }

The reader may notice that this query is susceptible to problems with consistency when we have concurrent calls. We have not set an isolation level for our transaction, but just like our problems with inventory transfer http://honstain.com/inventory-transfer-row-locking/ we can get undesirable behavior if multiple requests to create for a given location/SKU overlap.

Demonstrate the Consistency Problem

If you use siege to pound on the service with a few users you can pretty quickly observe some undesirable behavior.

siege -v -c2 -r10 --content-type "application/json" -f siege_urls.txt

#### siege_urls.text ####
127.0.0.1:8080/double
127.0.0.1:8080/double POST {"sku": "SKU-01","qty": 1,"location": "LOC-01"}
127.0.0.1:8080/double POST {"sku": "SKU-01","qty": 17,"location": "LOC-01"}

We would hope to only ever see a quantity of 1 or 17 (with corresponding adjustment records in the DB).

id sku qty type location
448 SKU-01 1 adjust LOC-01
449 SKU-01 1 adjust LOC-01
450 SKU-01 15 adjust LOC-01
451 SKU-01 15 adjust LOC-01
452 SKU-01 -31 adjust LOC-01
453 SKU-01 0 adjust LOC-01
#### Example Logging from Sacalatra Service ####
10:41:52.365 [scala-execution-context-global-35] DEBUG o.b.h.i.d.InventoryDoubleRecordDao$ - INSERT need qty: 17 existing: 2 for sku: SKU-01 location: LOC-01
10:41:52.372 [scala-execution-context-global-35] DEBUG o.b.h.i.d.InventoryDoubleRecordDao$ - INSERT need qty: 17 existing: 2 for sku: SKU-01 location: LOC-01
10:41:52.387 [qtp1637506559-12] DEBUG o.b.h.inventory.app.ToyInventory - GET: location: SKU-01 sku: LOC-01 qty: Some(17)
10:41:56.813 [qtp1637506559-17] DEBUG o.b.h.inventory.app.ToyInventory - GET: location: SKU-01 sku: LOC-01 qty: Some(32)
10:41:56.818 [scala-execution-context-global-39] DEBUG o.b.h.i.d.InventoryDoubleRecordDao$ - INSERT need qty: 1 existing: 32 for sku: SKU-01 location: LOC-01
10:41:56.827 [scala-execution-context-global-39] DEBUG o.b.h.i.d.InventoryDoubleRecordDao$ - INSERT need qty: 1 existing: 1 for sku: SKU-01 location: LOC-01
10:41:56.839 [scala-execution-context-global-38] DEBUG o.b.h.i.d.InventoryDoubleRecordDao$ - INSERT need qty: 17 existing: 1 for sku: SKU-01 location: LOC-01
10:41:56.862 [scala-execution-context-global-38] DEBUG o.b.h.i.d.InventoryDoubleRecordDao$ - INSERT need qty: 17 existing: 17 for sku: SKU-01 location: LOC-01

I would encourage you to play with several different isolation levels and observe how PostgreSQL handles this sort of query (read following by an insert).

db.run(insert.transactionally.withTransactionIsolation(TransactionIsolation.Serializable))

Summary of the Initial Create/Insert/Update Logic

We have created a basic design for tracking inventory using a DB schema based on double-entry accounting. It still has some gaps with consistency (just like our initial attempt that relied on a single DB record for each location and SKU pair), but hopefully this gives you some ideas.

Source code for this blog post: https://bitbucket.org/honstain/scalatra-double-record-transfer-service/src/master/

Increasing Consistency of the Create/Insert/Update Logic

One idea we can explore is taking a pessimistic lock on the location and SKU. What would that imply in our current schema?

  • Using a SELECT FOR UPDATE would mean that we lock all the records needed to compute the current value, this would be worse (in terms of DB overhead to support locking) than our schema that used a single record to track the quantity.
id location sku qty type
474 LOC-01 SKU-01 1 adjust
475 LOC-01 SKU-01 1 adjust
476 LOC-01 SKU-01 1 adjust
477 LOC-01 SKU-01 1 adjust
478 LOC-01 SKU-01 -3 adjust
479 LOC-01 SKU-01 0 adjust

We will instead create a new table just to support this locking behavior.

CREATE TABLE inventory_lock
(
  location text,
  sku text,
  revision integer,
  CONSTRAINT pk_lock PRIMARY KEY (location, sku)
);
class InventoryDoubleRecordLocks(tag: Tag) extends Table[(String, String, Int)](tag, "inventory_lock") {
  def location = column[String]("location")
  def sku = column[String]("sku")
  def revision = column[Int]("revision")
  def * = (location, sku, revision)
}

Given this additional table, here is one way we might include it in our create functions database transaction:

      createLock <- {
        TableQuery[InventoryDoubleRecordLocks].filter(x => x.location === location && x.sku === sku).forUpdate.result
      }

      _ <- {
        createLock match {
          case Seq((`location`, `sku`, _)) =>
            val updateLock = TableQuery[InventoryDoubleRecordLocks]
            val q = for { x <- updateLock if x.location === location && x.sku === sku } yield x.revision
            q.update(createLock.head._3 + 1)
          case _ =>
            // Create if no record lock existed
            TableQuery[InventoryDoubleRecordLocks] += (location, sku, 0)
        }
      }

This has two main pieces, attempt to read the record FOR UPDATE Then we support creating a new record if this is the first time the Location/SKU pair has been seen.

I make no claim that this is the best solution to the problem, but it illustrates one way to maintain consistency. This solution does not make use of foreign key constraints or joins.

Summary

We have taken a tour Scalatra and Slick while implementing a very rudimentary service for tracking physical inventory (tracking quantities of a SKU by location). There are many ways that you could solve this problem, and I have tried to outline some of the options and what the trade-offs are.

The primary goal of this set of blogs on this toy inventory system, was to learn and share (I was exploring as I went). I am still inexperienced with Scala and the ecosystem (while being reasonably comfortable with PostgreSQL).

I found Scalatra and Slick reasonably difficult to adapt to, probably because I am still trying to write Java and JDBI. I humbly admit my weakness here.

I had originally set out to compare the transfer logic between the single record design (one DB record to track the qty of a location/SKU pair) with the double-entry model covered in this post. I have implemented a transfer function (which you can find here https://bitbucket.org/honstain/scalatra-double-record-inventory) but I will not create a special blog post to cover it. I think the create example here is sufficient to illustrate the logic and I would like to move on from Scalatra and experiment with the Play framework https://www.playframework.com/.

Want to Know More About Double-Entry Accounting?

Some initial references that are worth considering if you would like to further familiarize yourself with double record/entry accounting:

Tags