Locomotive IoT Simulation with NodeJS, Kafka, Mongo, PostgreSQL, Logging, and Telegram Notification

Muhammad Umar Al Fajar
10 min readNov 21, 2023

--

Hello everyone. My name is Umar, and I am currently a software developer at PT. Padepokan 79. On this occasion, I will share how locomotive IoT simulation was made with Spring Scheduler which also implementing message broker and Telegram notification.

For this project, we need several tools such as:

I assume that you have installed all the necessary tools (if not, then you can look up to the link I provided above). After that, let’s proceed to the next step.

Environment Preparation

First of all, we need to create a container for Kafka:

version: "3.8"
services:
zookeeper:
container_name: zookeeper
restart: always
image: bitnami/zookeeper
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
container_name: kafka
restart: always
image: bitnami/kafka
ports:
- "9094:9094"
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
kafdrop:
container_name: kafdrop
image: obsidiandynamics/kafdrop
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: kafka:9092
depends_on:
- kafka

As we can see there are 3 containers, first is Zookeeper, Kafka, and Kafdrop.

Zookeeper is necessary for Kafka because it performs several important functions, such as controller election, cluster membership, topic configuration, quotas, and consumer offsets. For the details, we can refer to the following sources:
1. cloudkafka
2. stackoverflow
3. scaler

As for the Kafdrop, it can help you monitor and manage your Kafka cluster, as well as debug and test your applications that use Kafka as a data source.

MongoDB

Next, create container for MongoDB:

docker container create --name mongodb --publish 27017:27017 mongo

After running the container, run the following command:

docker exec -it mongodb bash

That command allows you to run command in a running container. Then enter the MongoDB:

mongosh

After that, create the database and the collection:

use locomotivedb

db.createCollection("locomotive")

PostgreSQL

For PostgreSQL I've already installed it on my machine. I used pgAdmin as the graphical management tool for PostgreSQL.

Create database named locomotive.

Then, create a table to store the locomotive summary:

CREATE TABLE locomotive_summary (
id uuid not null,
total_locomotive int4 not null,
total_locomotive_poor int4 not null,
total_locomotive_good int4 not null,
total_locomotive_excellent int4 not null,
last_modified_time timestamp null,
constraint pk_locomotive_summary primary key (id)
);

Next, we can start to create the scheduler App.

Creating The Scheduler

For the creation of Scheduler project, I used IntelliJ IDEA, or you can use start.spring.io to generate the Java Spring Boot project.

As for the dependencies, we need Lombok and WebFlux.

First, we will create the required bean. Here, I create a WebClient bean that will be used to call the NodeJS application’s API:

@Configuration
public class UtilObjects {

@Bean
WebClient webClient() {
return WebClient.builder().baseUrl("http://localhost:3050/v1/").build();
}
}

Next, create the model:

@Data
@Builder
public class Locomotive {
private String code;
private String name;
private String dimension;
private String status;
private String time;
}

Then, we create the scheduler:

@Component
public class LocomotiveScheduler {

@Autowired
private WebClient webClient;

@Scheduled(fixedDelay = 10000)
public void scheduler() throws JsonProcessingException {
String[] status = { "Poor", "Good", "Excellent"};
Random random = new Random();

Locomotive locomotive = Locomotive.builder()
.code(UUID.randomUUID().toString().substring(0, 13))
.name("Loco" + UUID.randomUUID().toString().substring(0, 4))
.dimension("10 x 10")
.status(status[random.nextInt(status.length)])
.time(LocalDateTime.now().toString())
.build();

webClient.post().uri("/forward").body(Mono.just(locomotive), Locomotive.class).retrieve().bodyToMono(Void.class).block();
}
}

Now configure the main class:

@EnableScheduling
@SpringBootApplication(scanBasePackages = {"com.main.locomotive.app", "com.main.locomotive.util"}) //adjust to your project
public class LocomotiveApplication {

public static void main(String[] args) {
SpringApplication.run(LocomotiveApplication.class, args);
}

}

Create NodeJS API

First of all, create the project:

npm init -y

After that, install the necessary dependencies:

npm i kafkajs mongodb

Then, create connection to MongoDB:

class Database {

static connection = null;

static getConnection() {
if (!this.connection) {
this.connection = new MongoClient("mongodb://localhost").db(
"locomotivedb"
);
}
return this.connection;
}
}

export { Database };

Next, create a topic before we can configure the Kafka. Make sure the containers are running, run the Kafdrop with visiting: http://localhost:9000.
And then create a new topic:

Now, we can create a configuration for the Kafka:

class KafkaConfig {
constructor() {
this.kafka = new Kafka({
clientId: "nodejs-kafka",
brokers: ["localhost:9094"],
});
this.producer = this.kafka.producer({
createPartitioner: Partitioners.LegacyPartitioner,
});
this.consumer = this.kafka.consumer({ groupId: "a" });
}

async produce(topic, messages) {
try {
await this.producer.connect();
await this.producer.send({
topic: topic,
messages: messages,
});
} catch (error) {
console.error(error);
} finally {
await this.producer.disconnect();
}
}

async consume(topic, callback) {
try {
await this.consumer.connect();
await this.consumer.subscribe({ topic: topic, fromBeginning: true });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const value = message.value.toString();
callback(value);
},
});
} catch (error) {
console.error(error);
}
}
}

export default KafkaConfig;

After that, create the necessary service:

const forward = async (req, res) => {
try {
const data = req.body;
const kafkaConfig = new KafkaConfig();
const messages = [{ value: JSON.stringify(data) }];
kafkaConfig.produce("locomotive", messages);

res.status(200).json({
message: "Message successfully send!",
});
} catch (error) {
console.log(error);
}
};

export { forward };

Next, create the controller:

const router = express.Router();

router.route("/forward").post(forward);

export default router;

And then, create the server:

const connection = Database.getConnection().collection("locomotive");

const PORT = 3050;
const app = express();

app.use(express.json());

app.use("/v1", forwardController);

const kafkaConfig = new KafkaConfig();
kafkaConfig.consume("locomotive", (value) => {
const obj = JSON.parse(value);

connection.insertOne({
_id: obj.code,
locoName: obj.came,
locoDimension: obj.dimension,
status: obj.status,
time: obj.time,
});
});

app.listen(PORT, () => console.log(`Server running on port ${PORT}`));

Create Scheduler Report

First of all, generate the project:

I used IntelliJ IDEA to generate the project (or you can visit start.spring.io).
For the dependencies, we need:

As for the telegram dependencies, we need to add it manually:

<dependency>
<groupId>org.telegram</groupId>
<artifactId>telegrambots-spring-boot-starter</artifactId>
<version>6.7.0</version>
</dependency>
<dependency>
<groupId>org.telegram</groupId>
<artifactId>telegrambots-abilities</artifactId>
<version>6.7.0</version>
</dependency>

After that, go to application.properties to configure the database:

spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=locomotivedb
spring.data.mongodb.authentication-database=admin

spring.datasource.url= jdbc:postgresql://localhost:5432/locomotive
spring.datasource.username= postgres
spring.datasource.password= YOUR_PASSWORD

spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation= true
spring.jpa.properties.hibernate.dialect= org.hibernate.dialect.PostgreSQLDialect
spring.datasource.driverClassName=org.postgresql.Driver

# Hibernate ddl auto (create, create-drop, validate, update)
spring.jpa.hibernate.ddl-auto=update

Now, let’s create the model for MongoDB and PostgreSQL:

@Data
@Builder
@Document(collection = "locomotive")
public class Locomotive {
@Id
private String code;
private String name;
private String dimension;
private String status;
private String time;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Builder
@Table(name = "locomotive_summary")
@EntityListeners(AuditingEntityListener.class)
public class LocomotiveSummary {
@Id
@GeneratedValue(generator = "uuid2")
@GenericGenerator(name = "uuid2", strategy = "org.hibernate.id.UUIDGenerator")
@Column(name = "id")
private UUID id;

@Column(name = "total_locomotive")
private int totalLocomotive;

@Column(name = "total_locomotive_poor")
private int totalLocomotivePoor;

@Column(name = "total_locomotive_good")
private int totalLocomotiveGood;

@Column(name = "total_locomotive_excellent")
private int totalLocomotiveExcellent;

@LastModifiedDate
@Column(name = "last_modified_time")
private LocalDateTime lastModifiedTime;
}

After that, create the repositories:

public interface LocomotiveRepository extends MongoRepository<Locomotive, String> {
long countByStatus(String status);
}
public interface LocomotiveSummaryRepository extends JpaRepository<LocomotiveSummary, UUID> {

}

Before we configure the Telegram bot, we need to create the bot first to get the token:

And then create the TelegramBot bean object:

@Component
public class TelegramBot extends AbilityBot {

public TelegramBot() {
super(YOUR_BOT_TOKEN, "loco_summary_bot");
}

@Override
public long creatorId() {
return 1L;
}

public void sendTextMessage(String chatId, String text) {
SendMessage message = new SendMessage();
message.setChatId(chatId);
message.setText(text);

try {
execute(message);
} catch (TelegramApiException e) {
e.printStackTrace();
}
}
}

After that, we need our account’s chat ID by visiting IDBot and then do some commands below:

Copy the ID and then create the Scheduler:

@Component
@Slf4j
public class Schedular {

@Autowired
private LocomotiveRepository locomotiveRepository;

@Autowired
private LocomotiveSummaryRepository locomotiveSummaryRepository;

@Autowired
private TelegramBot telegramBot;

@Scheduled(fixedDelay = 10000)
public void scheduler() {
locomotvefSummaryRepository.deleteAll();

int totalLocomotive = (int) locomotiveRepository.count();
int totalPoorLocomotive = (int) locomotiveRepository.countByStatus("Poor");
int totalGoodLocomotive = (int) locomotiveRepository.countByStatus("Good");
int totalExcellentLocomotive = (int) locomotiveRepository.countByStatus("Excellent");

LocomotiveSummary locomotiveSummary = LocomotiveSummary.builder()
.totalLocomotive(totalLocomotive)
.totalLocomotivePoor(totalPoorLocomotive)
.totalLocomotiveGood(totalGoodLocomotive)
.totalLocomotiveExcellent(totalExcellentLocomotive)
.build();

locomotiveSummaryRepository.save(locomotiveSummary);

log.info("Summary Update");
telegramBot.sendTextMessage(YOUR_CHAT_ID,
String.format("""
<====== Locomotive Summary ======>

Total Locomotive = %d
Total Poor Locomotive = %d
Total Good Locomotive = %d
Total Excellent Locomotive = %d
""", totalLocomotive, totalPoorLocomotive, totalGoodLocomotive,
totalExcellentLocomotive));
}
}
  • The code above demonstrates how I generated a data summary from mongodb and logged it. We input the ID when we want to use the bot to send a message. After that, we have to adjust the logging to keep the log in a file (logback.xml) in the resources folder (where the application.properties file also exists):
<configuration>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>application.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>application-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxFileSize>100MB</maxFileSize>
<maxHistory>60</maxHistory>
<totalSizeCap>20GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>%-4relative %d{HH:mm:ss} %d{yyyy-MM-dd} [%thread] %-5level %logger{35} - %msg%n</pattern>
</encoder>
</appender>

<root level="info">TimeBasedRollingPolicy
<appender-ref ref="FILE"/>
</root>

</configuration>

I implemented a rolling file appender so that when it reaches a certain criteria, the logging application will create a new file.
After that, we will create an api for the web monitoring application. First, we will create the service:

@Service
public class AdminService {

@Autowired
private LocomotiveSummaryRepository locomotiveSummaryRepository;

public LocomotiveSummary getLocomotiveSummary() {
return locomotiveSummaryRepository.findAll().get(0);
}
}

And then create the controller:

@RestController
@RequestMapping("/v1/admin")
@CrossOrigin
public class AdminController {

@Autowired
private AdminService adminService;

@GetMapping("/summary")
public ResponseEntity<LocomotiveSummary> geLocomotiveSummary() {
return ResponseEntity.ok(adminService.getLocomotiveSummary());
}
}

When you are done, don’t forget to create an auditing bean:

@Configuration
@EnableJpaAuditing(dateTimeProviderRef = "auditingDateTimeProvider")
public class JpaAuditingConfiguration {

@Bean(name = "auditingDateTimeProvider")
DateTimeProvider dateTimeProvider() {
return () -> Optional.of(LocalDateTime.now());
}
}

The bean is used to generate the content of the property automatically when saving data if there is a property that uses annotations such as @CreatedDate, @LastModifiedDate, etc.

Now, we can start to configure the main class:

@EnableScheduling
@SpringBootApplication(scanBasePackages = "com.project") //adjust to your project
@EnableMongoRepositories(basePackages = "com.project.repository.mongodb") //adjust to your project
@EnableJpaRepositories(basePackages = "com.project.repository.postgre") //adjust to your project
@EntityScan(basePackages = { "com.project.model" }) //adjust to your projectF
public class ReportApplication implements ApplicationContextAware {
private static ApplicationContext ctx;

public static void main(String[] args) throws BeansException, TelegramApiException {
SpringApplication.run(ReportApplication.class, args);

TelegramBotsApi botsApi = new TelegramBotsApi(DefaultBotSession.class);
botsApi.registerBot(ctx.getBean("telegramBot", AbilityBot.class));
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ctx = applicationContext;
}
}

In the code above, we performed a scan on the repository and model that we created. Then, we registered the bot that we created.

Create Dashboard

I used WebStorm to generate the React + Vite project for the dashboard:

Next, install all the necessary dependencies:

"dependencies": {
"@emotion/react": "^11.11.1",
"@emotion/styled": "^11.11.0",
"@fontsource/inter": "^5.0.15",
"@fontsource/poppins": "^5.0.8",
"@fontsource/roboto": "^5.0.8",
"@mui/icons-material": "^5.14.16",
"@mui/material": "^5.14.17",
"@reduxjs/toolkit": "^1.9.7",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-google-charts": "^4.0.1",
"react-redux": "^8.1.3",
"react-router-dom": "^6.18.0"
},

After that, configure the Redux:

const baseQuery = fetchBaseQuery({
baseUrl: "http://localhost:8080/v1",
});

export const apiSlice = createApi({
baseQuery,
tagTypes: [],
endpoints: (builder) => ({}),
onError: () => {},
});

Next, create the endpoint:

export const adminApiSlice = apiSlice.injectEndpoints({
endpoints: (builder) => ({
locomotiveSummary: builder.query({
query: () => ({
url: "/admin/summary",
}),
}),
}),
});

export const { useLocomotiveSummaryQuery } = adminApiSlice;

Then, create the store:

export const store = configureStore({
reducer: {
[apiSlice.reducerPath]: apiSlice.reducer,
},
middleware: (getDefaultMiddleware) =>
getDefaultMiddleware().concat(apiSlice.middleware),
devTools: true,
});

After that, go to main.jsx and wrap the App with the store:

ReactDOM.createRoot(document.getElementById("root")).render(
<React.StrictMode>
<Provider store={store}>
<App />
</Provider>
</React.StrictMode>
);

Finally, create the user interface:

SummaryBox.jsx

const SummaryBox = ({ title, value, backgroundColor, Logo, size, color }) => {
return (
<Box
display="flex"
sx={{
backgroundColor,
width: "300px",
height: "150px",
borderRadius: "12px",
}}
>
<Box
flex={1.6}
display="flex"
flexDirection="column"
justifyContent="center"
rowGap="25px"
pl="10px"
pb="7px"
boxSizing="border-box"
sx={{ height: "100%" }}
>
<Typography
style={{
fontFamily: "Poppins",
fontSize: "17px",
fontWeight: 500,
color: "#fff",
textShadow: "1px 1px 2px rgba(0, 0, 0, 0.45)",
}}
>
{title}
</Typography>
<Typography
style={{
fontFamily: "Inter",
fontSize: "20px",
fontWeight: 500,
color: "#fff",
textShadow: "1px 1px 2px rgba(0, 0, 0, 0.45)",
}}
>
{value}
</Typography>
</Box>
<Box flex={1} display="flex" alignItems="center" height="100%">
<Logo sx={{ fontSize: size, color }} />
</Box>
</Box>
);
};

export default SummaryBox;

App.jsx

function App() {
const { data } = useLocomotiveSummaryQuery("", { pollingInterval: 10000 });

const chartData = [
["Status", "Total"],
["Excellent", data?.totalLocomotiveExcellent],
["Poor", data?.totalLocomotivePoor],
["Good", data?.totalLocomotiveGood],
];

const options = {
titleTextStyle: {
color: "#ffffff", // Set the font color for the title
},
legend: {
textStyle: {
color: "#ffffff", // Set the font color for the legend text
},
},
pieHole: 0.4,
is3D: false,
backgroundColor: '#3d3b3b',
chartArea: {
width: "100%",
height: "85%",
},
slices: {
0: { color: "#7aff52" },
1: { color: "#ff7878" },
2: { color: "#ffe53b" },
},
pieSliceBorderColor: "#4d4d4d",
pieSliceTextStyle: {
color: "#4d4d4d",
},
};

return (
<Box display="flex" sx={{ width: "100vw", height: "100vh", flexDirection: "column" }}>
<Appbar flexSize={0.8} />
<Box flex={6} p={2} sx={{ backgroundColor: "#565656", flexDirection: "row" }}>
<Box display="flex" columnGap="18px">
<SummaryBox
title="Total Locomotive"
value={data?.totalLocomotive}
backgroundColor="#91dffa"
Logo={CircleIcon}
size={100}
color="#a1e7ff"
/>
<SummaryBox
title="Total Locomotive Poor"
value={data?.totalLocomotivePoor}
backgroundColor="#ff7878"
Logo={ClearIcon}
size={100}
color="#fc9797"
/>
<SummaryBox
title="Total Locomotive Good"
value={data?.totalLocomotiveGood}
backgroundColor="#ffe53b"
Logo={CheckIcon}
size={100}
color="#fff4a3"
/>
<SummaryBox
title="Total Locomotive Excellent"
value={data?.totalLocomotiveExcellent}
backgroundColor="#7aff52"
Logo={DoneAllIcon}
size={100}
color="#a3ff87"
/>
</Box>
<Box display="flex" width="35%" height="35%" borderRadius={6} sx={{ overflow: 'hidden', marginTop: '12px' }}>
<Chart
chartType="PieChart"
data={chartData}
options={options}
width={"100%"}
height={"100%"}
/>
</Box>
</Box>
</Box>
);
}

export default App;

That’s all from this mini project. I hope you can gain something from this.

Thank you.

--

--