Contents

Kafka consumer migration

Migrating Nest.js Kafka consumer from one app to another using the offset to avoid losing messages or causing duplicate processing.

/posts/kafka-consumer-migration/kafka-consumer-migration.webp

Quick way

When wanting to switch off a consumer (app A) that’s been consuming from a topic heroes, for example, and want to have app B to start consuming from where the app A stopped, we can do it the simple way: just keep use the same groupId name. So if we had the consumer groupId from app A equal consumer-app-A the app B would have to use the same groupId name of consumer-app-A. This way we can start both of the apps at the same time, let kafka re-balance them and afterward turn off app A without any downtime, easy-peasy.

Longer way

The above solution is pretty fast, but later down the road might lead to confusion as to why we have a consumer group named consumer-app-A when we don’t have such an app? If someone knew would stumble on the project or heaven forbid, we lost all of our legacy developers, it might complicate debugging quite a bit do figure out who’s this consumer. To avoid having future complications, we would need to also change the name and use proper: consumer-app-B.

TLDR;
We would need to stop app A for a period of time to not consume messages, then we can use Kafka cli commands to copy offset of a group-app-A consumer group into group-app-B consumer group and when the new consumer has started it will have same offsets to continue from.

Docker Kafka and Kafka UI setup

Set up the Nest.js project with nest cli:

nest new my-kafka-consumer

Setup Docker compose of Kafka and KafkaUI (works on Mac M chips) docker-compose.yaml

version: '3'

networks:
  app-tier:
    driver: bridge

services:
  kafka:
    image: 'bitnami/kafka:latest'
    container_name: kafka
    hostname: kafka
    ports:
      - '9092:9092'
    networks:
      - app-tier
    environment:
      KAFKA_CFG_NODE_ID: 0
      KAFKA_CFG_PROCESS_ROLES: controller,broker
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:29093
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8090:8080"
    depends_on:
      - kafka
    networks:
      - app-tier
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS: kafka:29092

Start the Docker containers

docker compose up -d

Nest.js setup

Install the following npm libraries in the project:

npm install @nestjs/microservices kafkajs

Create a Kafka module in kafka/ directory with a configuration file we will re-use:

kafka.config.ts

import { ClientProviderOptions, Transport } from '@nestjs/microservices';

export const kafkaConfig: ClientProviderOptions = {
  name: 'HERO_SERVICE',
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero-app',
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-app-consumer',
      allowAutoTopicCreation: true,
    },
    subscribe: {
      fromBeginning: true,
    },
  },
};
subscribe fromBeginning
It’s important to have subscribed from beginning true as otherwise it would only pull messages that happen during the period the client is connected. Everything that happens during downtime of the client or from the past will be skipped.
consumer groupId
The groupId is the short/easy way mentioned at the start of the article as this represents the whole group of consumers that read from the same topic and which is used when scaling out consumers.

Update main.ts bootstrap function to start the microservices and enable shutdown hooks.

async function bootstrap() {
    const app = await NestFactory.create(AppModule);

    app.connectMicroservice(kafkaConfig);
    await app.startAllMicroservices();

    app.enableShutdownHooks();

    await app.listen(3000);
}

Create a kafka.service.ts which will handle the bootstrapping of the Kafka connection on app start and shutdown.

import {
  Inject,
  Injectable,
  OnModuleDestroy,
  OnModuleInit,
} from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';

@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
  public constructor(
    @Inject('HERO_SERVICE')
    private readonly client: ClientKafka,
  ) {}

  public async onModuleInit() {
    await this.client.connect();
  }

  public async onModuleDestroy() {
    await this.client.close();
  }

  async emit(topic: string, message: object): Promise<void> {
    await lastValueFrom(this.client.emit(topic, message));
  }
}

Export the kafka module kafka.module.ts

import { Module } from '@nestjs/common';
import { ClientsModule } from '@nestjs/microservices';
import { KafkaService } from './kafka.service';
import { kafkaConfig } from './kafka.config';

@Module({
  imports: [ClientsModule.register([kafkaConfig])],
  providers: [KafkaService],
  exports: [KafkaService],
})
export class KafkaModule {}

Import it in the app.module.ts

@Module({
    imports: [KafkaModule],
    controllers: [AppController],
    providers: [AppService],
})
export class AppModule {}

Now we can register the consumer/handler of the topic in the app.controller.rs

@Controller()
export class AppController {

    @EventPattern('heroes')
    public async handleHeroes(@Payload() hero: object): Promise<void> {
        console.log('Processing hero', hero);
    }
}


So for an example bellow we would be processing only the messages from offset 10 and above

/posts/kafka-consumer-migration/kafka-ui-offset.webp
Kafka UI messages

Start the worker, but after we copy the group metadata in the next step

npm run start:dev

Copying consumer group metadata

First, we need to download the binary of Kafka in which we will have access to script commands for easier management and extract it.

Here is a reference documentation for the commands with examples.

Example of listing all consumers of a topic

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

For some reason Nest.js creates 2 consumers, server and client, and it’s unclear in the documentation why this happens.

hero-app-consumer-client
hero-app-consumer-server

But we will continue with the -server one as it’s the one actually connected to the topic based on Kafka UI.

Consumer state during copy

Note that you must shut down the consumer and wait for Kafka to consider it in EMPTY state before starting the copy. Otherwise, you will get the following error message in output:

Error: Assignments can only be reset if the group 'hero-app-consumer-server' is inactive, but the current state is Stable.
KAFKA_BROKER=localhost:9092
TOPIC=heroes
FROM_GROUP_NAME=hero-app-consumer-server
TO_GROUP_NAME=hero-app2-consumer-server

# Store consumer group offset into file
bin/kafka-consumer-groups.sh \
  --bootstrap-server $KAFKA_BROKER \
  --export --group $FROM_GROUP_NAME --topic $TOPIC \
  --reset-offsets --to-current \
  --dry-run > offsets.txt

# Create the second consumer group from stored offset
bin/kafka-consumer-groups.sh \
  --bootstrap-server $KAFKA_BROKER \
  --execute --group $TO_GROUP_NAME \
  --reset-offsets --from-file offsets.txt

# Delete first consumer group  
bin/kafka-consumer-groups.sh \
   --bootstrap-server $KAFKA_BROKER \
   --group $FROM_GROUP_NAME \
   --delete

To verify this functionality update kafka.config.ts by changing the consumer.groupId to valuehero-app2-consumer and start the consumer, you should not see any reprocessing of the messages and if you submit a new message it should appear as normal.

Overall this way requires a bit more work, but you get proper naming and avoid any confusion on the long run.