Kafka consumer migration
Migrating Nest.js Kafka consumer from one app to another using the offset to avoid losing messages or causing duplicate processing.
Quick way
When wanting to switch off a consumer (app A) that’s been consuming from a topic heroes
, for example, and want to
have app B to start consuming from where the app A stopped, we can do it the simple way: just keep use the
same groupId
name. So if we had the consumer groupId
from app A equal consumer-app-A
the app B would
have to use the same groupId
name of consumer-app-A
. This way we can start both of the apps at the same time,
let kafka re-balance them and afterward turn off app A without any downtime, easy-peasy.
Longer way
The above solution is pretty fast, but later down the road might lead to confusion as to why we have a consumer
group named consumer-app-A
when we don’t have such an app? If someone knew would stumble on the project or heaven
forbid, we lost all of our legacy developers, it might complicate debugging quite a bit do figure out who’s this
consumer. To avoid having future complications, we would need to also change the name and use proper: consumer-app-B
.
Docker Kafka and Kafka UI setup
Set up the Nest.js project with nest cli:
nest new my-kafka-consumer
Setup Docker compose of Kafka and KafkaUI (works on Mac M chips) docker-compose.yaml
version: '3'
networks:
app-tier:
driver: bridge
services:
kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
hostname: kafka
ports:
- '9092:9092'
networks:
- app-tier
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:29093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8090:8080"
depends_on:
- kafka
networks:
- app-tier
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS: kafka:29092
Start the Docker containers
docker compose up -d
Nest.js setup
Install the following npm libraries in the project:
npm install @nestjs/microservices kafkajs
Create a Kafka module in kafka/
directory with a configuration file we will re-use:
kafka.config.ts
import { ClientProviderOptions, Transport } from '@nestjs/microservices';
export const kafkaConfig: ClientProviderOptions = {
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero-app',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-app-consumer',
allowAutoTopicCreation: true,
},
subscribe: {
fromBeginning: true,
},
},
};
Update main.ts
bootstrap function to start the microservices and enable shutdown hooks.
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.connectMicroservice(kafkaConfig);
await app.startAllMicroservices();
app.enableShutdownHooks();
await app.listen(3000);
}
Create a kafka.service.ts
which will handle the bootstrapping of the Kafka connection on app start and shutdown.
import {
Inject,
Injectable,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';
@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
public constructor(
@Inject('HERO_SERVICE')
private readonly client: ClientKafka,
) {}
public async onModuleInit() {
await this.client.connect();
}
public async onModuleDestroy() {
await this.client.close();
}
async emit(topic: string, message: object): Promise<void> {
await lastValueFrom(this.client.emit(topic, message));
}
}
Export the kafka module kafka.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule } from '@nestjs/microservices';
import { KafkaService } from './kafka.service';
import { kafkaConfig } from './kafka.config';
@Module({
imports: [ClientsModule.register([kafkaConfig])],
providers: [KafkaService],
exports: [KafkaService],
})
export class KafkaModule {}
Import it in the app.module.ts
@Module({
imports: [KafkaModule],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
Now we can register the consumer/handler of the topic in the app.controller.rs
@Controller()
export class AppController {
@EventPattern('heroes')
public async handleHeroes(@Payload() hero: object): Promise<void> {
console.log('Processing hero', hero);
}
}
So for an example bellow we would be processing only the messages from offset 10 and above
Start the worker, but after we copy the group metadata in the next step
npm run start:dev
Copying consumer group metadata
First, we need to download the binary of Kafka in which we will have access to script commands for easier management and extract it.
Here is a reference documentation for the commands with examples.
Example of listing all consumers of a topic
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
For some reason Nest.js creates 2 consumers, server and client, and it’s unclear in the documentation why this happens.
hero-app-consumer-client
hero-app-consumer-server
But we will continue with the -server one as it’s the one actually connected to the topic based on Kafka UI.
Note that you must shut down the consumer and wait for Kafka to consider it in EMPTY
state before starting the copy.
Otherwise, you will get the following error message in output:
Error: Assignments can only be reset if the group 'hero-app-consumer-server' is inactive, but the current state is Stable.
KAFKA_BROKER=localhost:9092
TOPIC=heroes
FROM_GROUP_NAME=hero-app-consumer-server
TO_GROUP_NAME=hero-app2-consumer-server
# Store consumer group offset into file
bin/kafka-consumer-groups.sh \
--bootstrap-server $KAFKA_BROKER \
--export --group $FROM_GROUP_NAME --topic $TOPIC \
--reset-offsets --to-current \
--dry-run > offsets.txt
# Create the second consumer group from stored offset
bin/kafka-consumer-groups.sh \
--bootstrap-server $KAFKA_BROKER \
--execute --group $TO_GROUP_NAME \
--reset-offsets --from-file offsets.txt
# Delete first consumer group
bin/kafka-consumer-groups.sh \
--bootstrap-server $KAFKA_BROKER \
--group $FROM_GROUP_NAME \
--delete
To verify this functionality update kafka.config.ts
by changing the consumer.groupId
to valuehero-app2-consumer
and start the consumer, you should not see any reprocessing of the messages and if you submit a new message it should
appear as normal.
Overall this way requires a bit more work, but you get proper naming and avoid any confusion on the long run.