Spring and Redis Streams Intro
This will be a quick walkthrough of standing up a very basic Spring Boot Kotlin service that can consume from a Redis Stream. We will also take a brief look at using RedisInsight as part of our local docker setup.
What you will get in this post:
- Standup Redis 6 using docker-compose
- Create a basic Spring Boot service (we are going to skip logging/alerting/testing)
- Create a consumer for the Redis stream
- Manually publish to the stream and inspect the state of the stream using RedisInsight.
This isn't meant to be a production system, it's really just a prototype used for experimenting with Redis Streams and the corresponding spring-data-redis+Lettuce libraries.
Key technologies at play here:
- Kotlin 1.6.10 https://kotlinlang.org/docs/releases.html#release-details
- Spring Boot version 2.6.3 https://spring.io/blog/2022/01/20/spring-boot-2-6-3-is-now-available
- spring-data-redis https://spring.io/projects/spring-data-redis
- Redis Streams via Redis 6.2.6 https://raw.githubusercontent.com/redis/redis/6.2/00-RELEASENOTES
- redisinsight https://redis.com/redis-enterprise/redis-insight/
- Docker https://docs.docker.com/engine/
- docker-compose https://docs.docker.com/compose/
- NOTE - the operating system I used was Ubuntu 20.04.2
Setup Redis Streams with Docker Compose
Starting with a very basic docker-compose script
version: "3.9"
# https://docs.docker.com/compose/compose-file/compose-versioning/
services:
  redis:
    # Reference:
    #   https://hub.docker.com/_/redis
    hostname: redis
    image: "redis:alpine"
    ports:
      - "6379:6379"
  redisinsight:
    # Reference:
    #   https://docs.redis.com/latest/ri/installing/install-docker/
    #
    # REMEMBER - to connect to the redis database, use the host: "redis"
    image: "redislabs/redisinsight:latest"
    ports:
      - "8001:8001"
Startup with docker-compose up
❯ docker-compose up
Starting streams_redis_1        ... done
Starting streams_redisinsight_1 ... done
Attaching to streams_redisinsight_1, streams_redis_1
redisinsight_1  | Process 9 died: No such process; trying to remove PID file. (/run/avahi-daemon//pid)
redis_1         | 1:C 13 Feb 2022 16:40:04.068 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
redis_1         | 1:C 13 Feb 2022 16:40:04.068 # Redis version=6.2.6, bits=64, commit=00000000, modified=0, pid=1, just started
redis_1         | 1:C 13 Feb 2022 16:40:04.068 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
redis_1         | 1:M 13 Feb 2022 16:40:04.068 * monotonic clock: POSIX clock_gettime
redis_1         | 1:M 13 Feb 2022 16:40:04.069 * Running mode=standalone, port=6379.
redis_1         | 1:M 13 Feb 2022 16:40:04.069 # Server initialized
redis_1         | 1:M 13 Feb 2022 16:40:04.069 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
redis_1         | 1:M 13 Feb 2022 16:40:04.070 * Loading RDB produced by version 6.2.6
redis_1         | 1:M 13 Feb 2022 16:40:04.070 * RDB age 63 seconds
redis_1         | 1:M 13 Feb 2022 16:40:04.070 * RDB memory usage when created 0.77 Mb
redis_1         | 1:M 13 Feb 2022 16:40:04.070 # Done loading RDB, keys loaded: 0, keys expired: 0.
redis_1         | 1:M 13 Feb 2022 16:40:04.070 * DB loaded from disk: 0.000 seconds
redis_1         | 1:M 13 Feb 2022 16:40:04.070 * Ready to accept connections
You should now be able to connect to redisinsight http://localhost:8001/




Lets create a stream using the redis-cli
❯ docker ps
CONTAINER ID   IMAGE                           COMMAND                  CREATED          STATUS         PORTS                                       NAMES
aca29cb3864a   redis:alpine                    "docker-entrypoint.s…"   11 minutes ago   Up 4 minutes   0.0.0.0:6379->6379/tcp, :::6379->6379/tcp   streams_redis_1
a5f043666ef5   redislabs/redisinsight:latest   "bash ./docker-entry…"   11 minutes ago   Up 4 minutes   0.0.0.0:8001->8001/tcp, :::8001->8001/tcp   streams_redisinsight_1
❯ docker exec -it aca29cb3864a sh
/data # redis-cli
127.0.0.1:6379> XADD mystream * sensor-id 1234 temperature 14.0
"1644771139891-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
We can now see the stream in redisinsight


Setup a Spring Boot Service
Create the initial skeleton
Using https://start.spring.io/ to initialize a service.

The project will be created with a HELP.md that looks like this, with links to some of the relevant documentation. 
# Getting Started
### Reference Documentation
For further reference, please consider the following sections:
* [Official Apache Maven documentation](https://maven.apache.org/guides/index.html)
* [Spring Boot Maven Plugin Reference Guide](https://docs.spring.io/spring-boot/docs/2.6.3/maven-plugin/reference/html/)
* [Create an OCI image](https://docs.spring.io/spring-boot/docs/2.6.3/maven-plugin/reference/html/#build-image)
* [Spring Data Redis (Access+Driver)](https://docs.spring.io/spring-boot/docs/2.6.3/reference/htmlsingle/#boot-features-redis)
### Guides
The following guides illustrate how to use some features concretely:
* [Messaging with Redis](https://spring.io/guides/gs/messaging-redis/)
Then import the application to IntelliJ


So now we have the shell of our app, we will need to create a connection to Redis and subscribe to the stream.
Redis Connection
I will start with the connection factory, I have opted to use Lettuce for my connection management.
If you want some resources to review for Lettuce and Jedis:
- https://redis.com/blog/jedis-vs-lettuce-an-exploration/
- https://docs.spring.io/spring-data/redis/docs/current/reference/html/#reference
- https://lettuce.io/
- https://github.com/redis/jedis
ConnectionFactory.kt
package com.example.StreamConsumerDemo2
import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
@Configuration
class ConnectionFactory {
    @Bean
    fun redisConnectionFactory(): LettuceConnectionFactory {
        return LettuceConnectionFactory(
            RedisStandaloneConfiguration("localhost", 6379)
        )
    }
}
Redis Template and the Listener
This will be the code that processes each message, its also where we will execute the manual ack to Redis.
MyStreamListner.kt
package com.example.StreamConsumerDemo2
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.core.StringRedisTemplate
import org.springframework.data.redis.stream.StreamListener
import org.springframework.stereotype.Component
@Component
class MyStreamListener(
    var redisTemplate: StringRedisTemplate
): StreamListener<String, MapRecord<String, String, String>> {
    override fun onMessage(message: MapRecord<String, String, String>) {
        println("id: ${message.id} stream: ${message.stream} value: ${message.value}")
        redisTemplate.opsForStream<String, String>().acknowledge("mygroup", message)
    }
}
Container and Subscription
Next, we need to create a container and a subscription on the StreamMessageListenerContainer that uses our Streams consumer group.
Some of the important things in the StreamConsumer.kt to consider:
- pollTimeouton the StreamMessageListenerContainerOptions.- This is important because it controls how long the lettuce client will poll for (which can impact shutdown).
 
- How we construct our Consumer, as this is where we specify the group and consumer.- This is important because you would control the consumer names here if you ended up creating multiple instances of this service.
- I have been very casual in this code with the configuration of the stream, group, consumer fields in an effort to minimize abstraction (and help the reader understand). You would want to organize this differently for production instead of for learning.
 
- How we construct StreamOffset.
- This is important because it will determine how we start consuming from the stream when we start up.
 
StreamConsumer.kt
package com.example.StreamConsumerDemo2
import org.springframework.data.redis.connection.RedisConnectionFactory
import org.springframework.data.redis.connection.stream.Consumer
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.connection.stream.ReadOffset
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.stream.StreamMessageListenerContainer
import org.springframework.data.redis.stream.Subscription
import org.springframework.stereotype.Component
import java.time.Duration
import java.util.concurrent.TimeUnit
import javax.annotation.PreDestroy
@Component
class StreamConsumer(
    redisConnectionFactory: RedisConnectionFactory,
    streamListener: MyStreamListener,
) {
    final val POLL_TIMEOUT = 1000L
    final var container: StreamMessageListenerContainer<String, MapRecord<String, String, String>>
    final var subscription: Subscription
    init {
        val containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
            .pollTimeout(Duration.ofMillis(POLL_TIMEOUT))
            .build()
        container = StreamMessageListenerContainer.create(redisConnectionFactory, containerOptions)
        val consumer = Consumer.from("mygroup", "Alice")
        subscription = container.receive(
            consumer,
            StreamOffset.create("mystream", ReadOffset.lastConsumed()),
            streamListener
        )
        container.start()
    }
}
Experiment with the Kotlin Service
This will give you a service that can consume from the Redis Stream, ack messages, and print their content.
Start your service using IntelliJ Run/Debug Configurations

If you want to use the command line instead (note - you need maven):
# To Construct the artifact.
mvn clean package
# To run the JAR we created in the previous mvn package.
java -jar target/StreamConsumerDemo2-0.0.1-SNAPSHOT.jar 
Which should give you something like this.

By publishing some messages to the stream using the redis-cli (running in a docker container), you can see your service start consuming them. There are many ways t achieve this, I have chosen this method in an effort to leverage tools the reader already has (we already stood up Redis in a docker container in the first part of this tutorial).
❯ docker ps --format '{{.Names}}'
streams_redisinsight_1
streams_redis_1
❯ docker exec -it streams_redis_1 sh
/data # redis-cli
127.0.0.1:6379> XADD mystream * sensor-id 1234 temperature 15.1
"1644856873947-0"
127.0.0.1:6379> XADD mystream * sensor-id 1234 temperature 15.0
"1644856875587-0"
127.0.0.1:6379> XADD mystream * sensor-id 1234 temperature 15.4
"1644856876818-0"


Spring Boot Shutdown Problem
The Connection is already closed / Connection reset by peer
Occasionally when I shut down the service I get the following error (it does not happen every time).

The critical parts of the error:
- [cTaskExecutor-1] io.lettuce.core.RedisChannelHandler : Connection is already closed
- [cTaskExecutor-1] ageListenerContainer$LoggingErrorHandler : Unexpected error occurred in scheduled task.
- org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed
I also saw error/exception messages like:
- [cTaskExecutor-1] ageListenerContainer$LoggingErrorHandler : Unexpected error occurred in scheduled task.
- org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
- Caused by: java.io.IOException: Connection reset by peer
- [oundedElastic-1] o.s.b.a.r.RedisReactiveHealthIndicator : Redis health check failed
How I Made Progress Investigating the Error
While looking through the code associated with the exception, I gravitated towards StreamPollTask which looks to be marshaled by StreamMessageListenerContainer and found this spring-data-redis github Issue https://github.com/spring-projects/spring-data-redis/issues/2246 where a spring-data-redis contributor suggests that the StreamMessageListenerContainer needs to be shutdown before the application.
This lead me to look at two classes in more detail:
- StreamMessageListenerContainerIt defines the redis calls (in a Java- Function), orchestrates the subscriptions, and creates- StreamPollTask.
- StreamPollTaskwithin this is a few interesting structures, there is a CountDownLatch and a- doLoop()function.
These are from the package org.springframework.data.redis.stream https://github.com/spring-projects/spring-data-redis
The next big step for me came when I started experimenting with waiting on the Subscription object via subscription.isActive combined with modifying the StreamMessageListenerContainerOptions and pollTimeout.
I found that the longer my pollTimeout was configured on the StreamMessageListenerContainer, the longer it would take for subscription.isActive to be false. Note that pollTimeout has a default of 2 seconds.
I struggled here because I assumed the must be a better way to wait for the blocking call to Redis to complete (the one involved with the pollTimeout).
- docs.spring.io pollTimeout https://docs.spring.io/spring-data/redis/docs/2.2.6.RELEASE/api/org/springframework/data/redis/stream/StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder.html#pollTimeout-java.time.Duration-
- docs.spring.io and the StreamReadOptions https://docs.spring.io/spring-data/redis/docs/2.2.0.M4/api/index.html?org/springframework/data/redis/connection/stream/StreamReadOptions.html
WorkAround for the Issue
This resulted in a modification of our StreamConsumer.kt class:
- The important piece this is the @PreDestroyannotation where we block and poll waiting for the subscription to finally stop.
package com.example.StreamConsumerDemo2
import org.springframework.data.redis.connection.RedisConnectionFactory
import org.springframework.data.redis.connection.stream.Consumer
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.connection.stream.ReadOffset
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.stream.StreamMessageListenerContainer
import org.springframework.data.redis.stream.Subscription
import org.springframework.stereotype.Component
import java.time.Duration
import java.util.concurrent.TimeUnit
import javax.annotation.PreDestroy
@Component
class StreamConsumer(
    redisConnectionFactory: RedisConnectionFactory,
    streamListener: MyStreamListener,
) {
    final val POLL_TIMEOUT = 1000L
    final var container: StreamMessageListenerContainer<String, MapRecord<String, String, String>>
    final var subscription: Subscription
    init {
        val containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
            .pollTimeout(Duration.ofMillis(POLL_TIMEOUT))
            .build()
        container = StreamMessageListenerContainer.create(redisConnectionFactory, containerOptions)
        val consumer = Consumer.from("mygroup", "Alice")
        subscription = container.receive(
            consumer,
            StreamOffset.create("mystream", ReadOffset.lastConsumed()),
            streamListener
        )
        container.start()
    }
    @PreDestroy
    fun preDestroy() {
        println("PreDestroy subscription - subscription?.isActive: ${subscription.isActive}")
        // Timing how long it takes https://stackoverflow.com/questions/1770010/how-do-i-measure-time-elapsed-in-java
        val startTime = System.nanoTime()
        // Using container.stop() since it already calls subscription.cancel()
        container.stop()
        //subscription.cancel()
        while (subscription.isActive) {
            //println("wait... 10ms")
            Thread.sleep(10)
        }
        val completionTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
        println("Time required for subscription.isActive==false : $completionTime ms")
        println("PreDestroy subscription - subscription?.isActive: ${subscription.isActive}")
    }
}
Summary
I think my solution leaves a lot to be desired, and I suspect my lack of familiarity with Spring Boot is probably evident. I have spent some time researching and trying to experiment with different shutdown strategies, but nothing has come to light that improves what I have above.
You can find the source code here:
