Overview

In our previous set of posts, we added progressively more functionality to our basic Scalatra service.

Now we would like to explore implementing a system for tracking inventory. What you will get from this post:

  • Creating a DB model for a simplified service (tracking inventory)
  • Using Slick to query that data
  • Using Slick to insert data
  • Using Slick and PostgreSQL to upsert
  • Tests for the new logic.

Creating a Service to Track Inventory Levels

Create the Database Schema

Let's start with a very basic schema that can track a product/SKU (if you are unfamiliar with the SKU terminology I recommend reading https://en.wikipedia.org/wiki/Stock_keeping_unit), a location, and a quantity. A location could have multiple SKU's in it, and a SKU can be in more than one location.

CREATE TABLE inventory_single
(
  id bigserial NOT NULL,
  sku text,
  qty integer,
  location text,
  CONSTRAINT pk_single PRIMARY KEY (id),
  UNIQUE (sku, location)
);

We can create a few example pieces of inventory:

INSERT INTO inventory_single(sku, qty, location) VALUES
('SKU-01', 2, 'LOC-01'),
('SKU-01', 0, 'LOC-02')
;

Create a DAO for Slick

Now that we have our database schema, lets set up our Scala code to access it. I have chosen here to abstract my database access and Slick queries with DAO object. I referred to this when trying to organize my DAO https://sap1ens.com/blog/2015/07/26/scala-slick-3-how-to-start/, I also found this helpful https://reactore.com/repository-patterngeneric-dao-implementation-in-scala-using-slick-3/.

import org.slf4j.{Logger, LoggerFactory}
import slick.jdbc.{PostgresProfile, TransactionIsolation}
import slick.jdbc.PostgresProfile.api._

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

case class InventorySingleRecord(
                                  id: Option[Int],
                                  sku: String,
                                  qty: Int,
                                  location: String
                                )

class InventorySingleRecords(tag: Tag) extends Table[InventorySingleRecord](tag, "inventory_single") {
  def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
  def sku = column[String]("sku")
  def qty = column[Int]("qty")
  def location = column[String]("location")
  def * =
    (id.?, sku, qty, location) <> (InventorySingleRecord.tupled, InventorySingleRecord.unapply)
}

object InventorySingleRecordDao extends TableQuery(new InventorySingleRecords(_)) {

  val logger: Logger = LoggerFactory.getLogger(getClass)

  def findAll(db: PostgresProfile.backend.DatabaseDef): Future[Seq[InventorySingleRecord]] = {
    db.run(this.result)
  }
}

This gives us a basic DAO that initial supports just a single query to return every record. Then if we want to retrieve the data we would get a Future for all the records in the database val futureResult = Await.result(singleDAO.findAll(database), Duration.Inf)

Testing the New DAO

One of the benefits of organizing our code this way, is the ability to test our database queries in isolation (something I found helpful while experimenting with Slick).

WARNING - the repeated create and drop, is intended to be simple (to understand for the reader) at the cost of being relatively slow and inefficient.

Before writing our tests, we could use a trait to set up a database just for our testing (and avoid clobbering the database used to run the service).

import org.scalatest.{BeforeAndAfterAll, Suite}

import slick.dbio.DBIO
import slick.jdbc.PostgresProfile.api._
import scala.concurrent.Await
import scala.concurrent.duration.Duration

trait PostgresSpec extends Suite with BeforeAndAfterAll {

  private val dbName = getClass.getSimpleName.toLowerCase
  private val driver = "org.postgresql.Driver"

  private val postgres = Database.forURL("jdbc:postgresql://localhost:5432/?user=<TODO-YOUR-USER>&password=<TODO-YOUR-PASSWORD>", driver = driver)

  def dropDB: DBIO[Int] = sqlu"DROP DATABASE IF EXISTS #$dbName"
  def createDB: DBIO[Int] = sqlu"CREATE DATABASE #$dbName"

  override def beforeAll(): Unit = {
    super.beforeAll()
    Await.result(postgres.run(dropDB), Duration.Inf)
    Await.result(postgres.run(createDB), Duration.Inf)
  }

  override def afterAll() {
    super.afterAll()
    Await.result(postgres.run(dropDB), Duration.Inf)
  }

  val database = Database.forURL(s"jdbc:postgresql://localhost:5432/$dbName?user=<TODO-YOUR-USER>&password=<TODO-YOUR-PASSWORD>", driver = driver)
}

This has a connection just for creating a special test database and tearing it down, along with a reference database for you to use in your test suite class.

import org.bitbucket.honstain.PostgresSpec
import org.scalatest.BeforeAndAfter
import org.scalatra.test.scalatest._
import slick.dbio.DBIO
import slick.jdbc.PostgresProfile.api._

import scala.concurrent.Await
import scala.concurrent.duration.Duration


class InventorySingleRecordDaoTests extends ScalatraFunSuite with BeforeAndAfter with PostgresSpec {

  def createInventoryTable: DBIO[Int] =
    sqlu"""
          CREATE TABLE inventory_single
          (
            id bigserial NOT NULL,
            sku text,
            qty integer,
            location text,
            CONSTRAINT pk_single PRIMARY KEY (id),
            UNIQUE (sku, location)
          );
      """
  def dropInventoryTable: DBIO[Int] =
    sqlu"""
          DROP TABLE IF EXISTS inventory_single;
      """

  before {
    Await.result(database.run(createInventoryTable), Duration.Inf)
  }

  after {
    Await.result(database.run(dropInventoryTable), Duration.Inf)
  }

  val TEST_SKU = "NewSku"
  val BIN_01 = "Bin-01"
  val BIN_02 = "Bin-02"

  test("findAll") {
    val futureFind = InventorySingleRecordDao.findAll(database)
    val findResult: Seq[InventorySingleRecord] = Await.result(futureFind, Duration.Inf)

    findResult should equal(List())
  }
}

Create New Inventory

Now that we have a very basic schema for the DB, we want to be able to create new records and inventory. Many of the Slick 3.0 examples I have seen only address the basic insert. We would like to go a bit further and support create and update logic, giving the logic that uses this DAO call the ability to create/destroy/modify inventory levels for a given location and SKU. But to start with let's do the most basic thing and build up.

Our first test could look something like this (note that the Slick documentation can be a good additional reference http://slick.lightbend.com/doc/3.3.0/queries.html#inserting)

    test("create single record and use DAO to validate") {
      val future = InventorySingleRecordDao.create(database, TEST_SKU, 1, BIN_01)
      val result: Int = Await.result(future, Duration.Inf)
      // The expected result is just a count of the number of rows impacted.
      result should equal(1)

      // Validate that changes were persisted, in this case we will use a DAO
      // function we previously created to help us validate our new one.
      val futureFind = InventorySingleRecordDao.findAll(database)
      val findResult: Seq[InventorySingleRecord] = Await.result(futureFind, Duration.Inf)
      findResult should contain only InventorySingleRecord(Some(1), TEST_SKU, 1, BIN_01)
    }

If you would prefer not to compose elements of the DAO in tests (to use with setup and decomposition), you could do the following by using new Slick queries for the test validation (inspecting the changes to the database).

    test("create single record and check slick query") {
      val future = InventorySingleRecordDao.create(database, TEST_SKU, 1, BIN_01)
      val result: Int = Await.result(future, Duration.Inf)
      // The expected result is just a count of the number of rows impacted.
      result should equal(1)

      // Validate that changes were persisted
      val inventoryTable = TableQuery[InventorySingleRecords]
      val futureFind = database.run(inventoryTable.result)
      val findResult: Seq[InventorySingleRecord] = Await.result(futureFind, Duration.Inf)
      findResult should contain only InventorySingleRecord(Some(1), TEST_SKU, 1, BIN_01)
    }

We can then add the following create method to our DAO

  def create(db: PostgresProfile.backend.DatabaseDef,
               sku: String,
               qty: Int,
               location: String
              ): Future[Int] = {
    val query = TableQuery[InventorySingleRecords] += InventorySingleRecord(Option.empty, sku, qty, location)
    db.run(query)
  }

This is a good starting point, but you will notice that it is fairly restrictive, we would only be able to create records if none existed (because of the unique constraint we previously placed on inventory and SKU columns). I would suggest experimenting with a test to prove this to yourself.

  test("create when unique constraint violated") {
    val resultFirst: Int = Await.result(
      InventorySingleRecordDao.create(database, TEST_SKU, 1, BIN_01),
      Duration.Inf
    )
    resultFirst should equal(1)

    // This naive test, will result in a PSQLException
    Await.result(
      InventorySingleRecordDao.create(database, TEST_SKU, 1, BIN_01),
      Duration.Inf
    )
  }

Unfortunately, the stack trace I got, was not very helpful in understanding where the problem occurred in our code base, but we can improve on error handling later.

ERROR: duplicate key value violates unique constraint "inventory_single_sku_location_key"
  Detail: Key (sku, location)=(NewSku, Bin-01) already exists.
org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "inventory_single_sku_location_key"
  Detail: Key (sku, location)=(NewSku, Bin-01) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
	at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:143)
	at org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:120)
	at slick.jdbc.JdbcActionComponent$InsertActionComposerImpl$SingleInsertAction.$anonfun$run$15(JdbcActionComponent.scala:522)
	at slick.jdbc.JdbcBackend$SessionDef.withPreparedStatement(JdbcBackend.scala:425)
	at slick.jdbc.JdbcBackend$SessionDef.withPreparedStatement$(JdbcBackend.scala:420)
	at slick.jdbc.JdbcBackend$BaseSession.withPreparedStatement(JdbcBackend.scala:489)
	at slick.jdbc.JdbcActionComponent$InsertActionComposerImpl.preparedInsert(JdbcActionComponent.scala:513)
	at slick.jdbc.JdbcActionComponent$InsertActionComposerImpl$SingleInsertAction.run(JdbcActionComponent.scala:519)
	at slick.jdbc.JdbcActionComponent$SimpleJdbcProfileAction.run(JdbcActionComponent.scala:30)
	at slick.jdbc.JdbcActionComponent$SimpleJdbcProfileAction.run(JdbcActionComponent.scala:27)
	at slick.basic.BasicBackend$DatabaseDef$$anon$3.liftedTree1$1(BasicBackend.scala:275)
	at slick.basic.BasicBackend$DatabaseDef$$anon$3.run(BasicBackend.scala:275)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Create with Update

Now we want to take our current create logic and to go one step further, allowing the caller to also update the volume of an existing record (upsert). Let's start by modifying our test to expect this new behavior.

  test("create with update") {
    val future = InventorySingleRecordDao.create(database, TEST_SKU, 1, BIN_01)
    Await.result(future, Duration.Inf)

    val futureUpdate = InventorySingleRecordDao.create(database, TEST_SKU, 3, BIN_01)
    val resultUpdate: Int = Await.result(futureUpdate, Duration.Inf)
    resultUpdate should equal(1)

    // Validate that changes were persisted
    val inventoryTable = TableQuery[InventorySingleRecords]
    val futureFind = database.run(inventoryTable.result)
    val findResult: Seq[InventorySingleRecord] = Await.result(futureFind, Duration.Inf)
    findResult should contain only InventorySingleRecord(Some(1), TEST_SKU, 3, BIN_01)
  }

With the test in place, we can implement our changes. Before we implement the Slick query it is worth reviewing what our specific database supports. In our case, PostgreSQL https://www.postgresql.org/docs/9.5/sql-insert.html supports upsert behavior with the "ON CONFLICT" clause. We could use a SQL query like this to get an atomic upsert.

INSERT INTO inventory_single (sku, qty, location)
VALUES ('SKU-01', 3, 'LOC-01')
ON CONFLICT ON CONSTRAINT inventory_single_sku_location_key
DO UPDATE SET qty = EXCLUDED.qty;

The corresponding raw SQL implemented in Slick (this is a useful reference http://slick.lightbend.com/doc/3.3.0/sql.html) would be:

  def create(db: PostgresProfile.backend.DatabaseDef,
               sku: String,
               qty: Int,
               location: String
              ): Future[Int] = {
    //val query = TableQuery[InventorySingleRecords] += InventorySingleRecord(Option.empty, sku, qty, location)
    val query: DBIO[Int] =
      sqlu"""
           INSERT INTO inventory_single (sku, qty, location)
           VALUES ($sku, $qty, $location)
           ON CONFLICT ON CONSTRAINT inventory_single_sku_location_key
              DO UPDATE SET qty = EXCLUDED.qty;
        """
    db.run(query)
  }

That now gives us the ability to create/update quantities for a SKU and location while working within the existing constraints (we said the sku,location needed to be unique). You may not prefer the raw SQL, but we will explore alternatives in the next section.

Create/Update and Return the New/Updated Record

The create DAO only returns an Int to indicate if a row was modified, we would now like to return the new value of the record pending the create/update. We can modify our SQL by adding the RETURNING clause to the query.

INSERT INTO inventory_single (sku, qty, location)
VALUES ('SKU-01', 3, 'LOC-01')
ON CONFLICT ON CONSTRAINT inventory_single_sku_location_key
DO UPDATE SET qty = EXCLUDED.qty
RETURNING sku, qty, location;

Which means we then need to update the types in our DAO and it's tests.

  test("create") {
    val future = InventorySingleRecordDao.create(database, TEST_SKU, 1, BIN_01)
    val result: Seq[(String, Int, String)] = Await.result(future, Duration.Inf)
    // The expected result is just a count of the number of rows impacted.
    result should contain only((TEST_SKU, 1, BIN_01))

    // Validate that changes were persisted
    val inventoryTable = TableQuery[InventorySingleRecords]
    val futureFind = database.run(inventoryTable.result)
    val findResult: Seq[InventorySingleRecord] = Await.result(futureFind, Duration.Inf)
    findResult should contain only InventorySingleRecord(Some(1), TEST_SKU, 1, BIN_01)
  }

  test("create with update") {
    val future = InventorySingleRecordDao.create(database, TEST_SKU, 1, BIN_01)
    Await.result(future, Duration.Inf)

    val futureUpdate = InventorySingleRecordDao.create(database, TEST_SKU, 3, BIN_01)
    val resultUpdate: Seq[(String, Int, String)] = Await.result(futureUpdate, Duration.Inf)
    resultUpdate should contain only((TEST_SKU, 3, BIN_01))

    // Validate that changes were persisted
    val inventoryTable = TableQuery[InventorySingleRecords]
    val futureFind = database.run(inventoryTable.result)
    val findResult: Seq[InventorySingleRecord] = Await.result(futureFind, Duration.Inf)
    findResult should contain only InventorySingleRecord(Some(1), TEST_SKU, 3, BIN_01)
  }

The create query in the DAO then becomes:

  def create(db: PostgresProfile.backend.DatabaseDef,
               sku: String,
               qty: Int,
               location: String
              ): Future[Seq[(String, Int, String)]] = {
    val query: DBIO[Seq[(String, Int, String)]] =
      sql"""
           INSERT INTO inventory_single (sku, qty, location)
           VALUES ($sku, $qty, $location)
           ON CONFLICT ON CONSTRAINT inventory_single_sku_location_key
              DO UPDATE SET qty = EXCLUDED.qty
           RETURNING sku, qty, location;
        """.as[(String, Int, String)]
    db.run(query)
  }

The astute reader will notice that we no longer have a mapping to the InventorySingleRecord case class that we set up. We started by just passing back a tuple of (String, Int, String) to represent the record.

Summary

We made it as far as creating the schema, reading from the DB and doing some progressively complicated upsert behavior. In the next posts, we will expand on this functionality and attempt to transfer inventory.