Spring Boot, Kafka, and WebSocket: A Practical Approach to Real-Time Messaging

Muhammad Umar Al Fajar
11 min readDec 19, 2023

--

Spring Boot, Kafka, and WebSocket

In the world of application development, real-time communication has become an essential feature. Whether it’s a social media app, a collaborative tool, or a live event streaming platform, the ability to exchange information in real-time enhances the user experience significantly. In this article, we will delve into the process of building a real-time chat application using Spring Boot, Kafka, and WebSocket.

Spring Boot, a project built on top of the Spring Framework, simplifies the bootstrapping and development of new Spring applications. Kafka, a distributed streaming platform, allows us to build real-time data pipelines and streaming apps with ease. WebSocket, on the other hand, provide a full-duplex communication channel over a single socket, making it ideal for real-time data transfer.

We will guide you through the process of setting up a Spring Boot application, integrating it with Kafka for message queuing, and using WebSocket for real-time communication. By the end of this article, you will have a solid understanding of how these technologies work together to create a real-time chat application.

If you have not prepared the Kafka environment, read this article. As for the Kafdrop, it’s optional; you can use the Kafka extension if you are using a JetBrains IDE. The repositories will be at the end of this article.

Let’s get started!

In the first section of our article, we will focus on setting up our Spring Boot application and integrating it with Kafka.

Spring Boot is a powerful framework that simplifies the setup and development of Spring applications. It provides a range of features such as auto-configuration, standalone-code, and embedded servers, which make it easier to create production-grade, stand-alone Spring applications.

To start, we need to set up our Spring Boot application. We can do this by using Spring Initializr, a tool provided by Spring to quickly bootstrap a Spring Boot application. For our chat application, we will need to include the `Web`, `Kafka`, `WebSocket`, and `Lombok` dependencies.

Spring Initializr
dependencies

Once our Spring Boot application is set up, we can start integrating it with Kafka. Kafka is a distributed streaming platform that is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, and incredibly fast, which makes it ideal for our real-time chat application.

First, let’s create `Message` class and `MessageType` class: These classes represent the message object and the types of messages that can be sent.

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Message {
private MessageType type;
private String content;
private String sender;
private String sessionId;
}
public enum MessageType {
CHAT,
CONNECT,
DISCONNECT
}

After that, create `KafkaConsumerConfig` class:

@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, Message> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "chat");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(),
new JsonDeserializer<>(Message.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

Next, in our `KafkaProducerConfig` class, we configure our Kafka producer. The producer is responsible for sending messages to Kafka. We use the `KafkaTemplate` provided by Spring Kafka to send messages to Kafka. Here’s what the `KafkaProducerConfig` class looks like:

@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

In the next section, we will discuss how to use WebSocket for real-time communication between the server and the clients.

In this section, we will focus on the WebSocket configuration. WebSocket provide a full-duplex communication channel over a single socket, which is ideal for real-time data transfer between the client and the server.

In our `WebSocketConfig` class, we configure our WebSocket message broker. The message broker handles messages that are sent to and from the WebSocket clients. Here’s what the `WebSocketConfig` class looks like:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("http://localhost:5173/").withSockJS();
}
}

In this class:

- `configureMessageBroker` method configures the message broker. It enables a simple in-memory message broker and sets the prefix for messages that are bound for @MessageMapping-annotated methods.
- `registerStompEndpoints` method registers the “/ws” endpoint, enabling SockJS fallback options so that alternate transports can be used if WebSocket is not available.

Now that we have our WebSocket configured, we can create a `MessageController` to handle sending and receiving messages. The `MessageController` uses the `@MessageMapping` annotation to map the destination of incoming messages to specific methods.

First, let’s create Sender class:

@Service
public class Sender {

private final KafkaTemplate<String, Message> kafkaTemplate;

public Sender(KafkaTemplate<String, Message> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void send(String topic, Message message) {
kafkaTemplate.send(topic, message);
}
}

And here’s what the `MessageController` class looks like:

@Controller
public class MessageController {

private final Sender sender;
private final SimpMessageSendingOperations messagingTemplate;
private static final Logger logger = LoggerFactory.getLogger(MessageController.class);

public MessageController(Sender sender, SimpMessageSendingOperations messagingTemplate) {
this.sender = sender;
this.messagingTemplate = messagingTemplate;
}

@MessageMapping("/chat.send-message")
public void sendMessage(@Payload Message chatMessage, SimpMessageHeaderAccessor headerAccessor) {
chatMessage.setSessionId(headerAccessor.getSessionId());
sender.send("messaging", chatMessage);
logger.info("Sending message to /topic/public: " + chatMessage);
messagingTemplate.convertAndSend("/topic/public", chatMessage);
logger.info("Message sent to /topic/public: " + chatMessage);
}

@MessageMapping("/chat.add-user")
@SendTo("/topic/public")
public Message addUser(
@Payload Message chatMessage,
SimpMessageHeaderAccessor headerAccessor
) {
if (headerAccessor.getSessionAttributes() != null) {
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
}

return chatMessage;
}
}

In this class:

- `sendMessage` method is mapped to handle messages sent to the “/chat.send-message” destination. It sends the received `Message` to the “/topic/public” destination, broadcasting it to all connected WebSocket clients.

In the next section of our article, we will discuss how to handle WebSocket events and how to consume messages from Kafka.

In the final section of our article, we will discuss how to handle WebSocket events and how to consume messages from Kafka.

Handling WebSocket events is crucial for maintaining the state of our application. For instance, when a user connects or disconnects, we might want to broadcast a message to all other users to notify them of this event. In Spring, we can handle these events by creating an event listener class.

In our `WebSocketEventListener` class, we have methods that are annotated with `@EventListener`. These methods are automatically called when the corresponding event occurs. Here’s what the `WebSocketEventListener` class looks like:

@Component
public class WebSocketEventListener {

private static final Logger logger = LoggerFactory.getLogger(WebSocketEventListener.class);
private final SimpMessageSendingOperations messagingTemplate;

public WebSocketEventListener(SimpMessageSendingOperations messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}

@EventListener
public void handleWebSocketConnectListener(SessionConnectedEvent event) {
logger.info("Received a new web socket connection");
}

@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
String username = (String) headerAccessor.getSessionAttributes().get("username");

if (username != null) {
logger.info("User Disconnected: " + username);
Message chatMessage = new Message();
chatMessage.setType(MessageType.DISCONNECT);
chatMessage.setSender(username);
messagingTemplate.convertAndSend("/topic/public", chatMessage);
}
}
}

In this class:

- `handleWebSocketConnectListener` method is called when a new WebSocket connection is established.
- `handleWebSocketDisconnectListener` method is called when a WebSocket connection is closed. It sends a ‘DISCONNECT’ message to all other connected WebSocket clients.

Finally, we need to consume the messages from Kafka. In our `Receiver` class, we have a method that is annotated with `@KafkaListener`. This method is automatically called when a message is received from the Kafka topic. Here’s what the `Receiver` class looks like:

@Service
public class Receiver {

private static final Logger logger = LoggerFactory.getLogger(Receiver.class);
private final SimpMessageSendingOperations messagingTemplate;
private final SimpUserRegistry userRegistry;

public Receiver(SimpMessageSendingOperations messagingTemplate, SimpUserRegistry userRegistry) {
this.messagingTemplate = messagingTemplate;
this.userRegistry = userRegistry;
}

@KafkaListener(topics = "messaging", groupId = "chat")
public void consume(Message chatMessage) {
logger.info("Received message from Kafka: " + chatMessage);
for (SimpUser user : userRegistry.getUsers()) {
for (SimpSession session : user.getSessions()) {
if (!session.getId().equals(chatMessage.getSessionId())) {
messagingTemplate.convertAndSendToUser(session.getId(), "/topic/public", chatMessage);
}
}
}
}
}

In this class, the `consume` method is called when a message is received from the “messaging” Kafka topic. It sends the received `Message` to the “/topic/public” WebSocket topic, broadcasting it to all connected WebSocket clients.

Frontend: Real-Time Messaging with React and STOMP over WebSocket

In this section, we’ll create a frontend application using React to interact with our backend service. We’ll use the @stomp/stompjs library to establish a STOMP protocol over WebSocket connection.

First, let’s create the project. For this project I’m using WebStorm IDE.

Next, we’ll install the necessary libraries:

npm install @stomp/stompjs sockjs-client @mui/material react-avatar

Here, @stomp/stompjs is used for the STOMP client, sockjs-client is used for the WebSocket connection, @mui/material is used for the UI components, and react-avatar is used to display user avatars.

Creating the Username Page

In `UsernamePage.jsx`, we would typically create a form that allows users to enter their username. Once the username is submitted, it can be stored in a state and passed to the `ChatPage` component as a prop. This username will then be used as the sender of the messages.

import { useState } from 'react';
import { Button, TextField, Container, Box } from '@mui/material';
import PropTypes from 'prop-types';

function UsernamePage({ setUsername }) {
UsernamePage.propTypes = {
setUsername: PropTypes.func.isRequired,
};

const [inputUsername, setInputUsername] = useState('');

const handleUsernameSubmit = (event) => {
event.preventDefault();
if (inputUsername) {
setUsername(inputUsername);
}
};

return (
<Container>
<Box display="flex" flexDirection="column" justifyContent="center" alignItems="center" mt={2}>
<h1>Type your username</h1>
<form onSubmit={handleUsernameSubmit}>
<Box display="flex" alignItems="stretch">
<TextField
sx={{
color: 'white', '& .MuiOutlinedInput-notchedOutline': { borderColor: 'gray' },
width: '300px',
'& .MuiOutlinedInput-root': {
borderRadius: '36px',
'& fieldset': {
borderColor: 'gray',
},
'& input': {
height: '8px',
},
},
}}
inputProps={{ style: { color: 'white' } }}
variant="outlined"
placeholder="Username"
value={inputUsername}
onChange={(e) => setInputUsername(e.target.value)}
/>
<Box marginLeft={2}>
<Button
variant="contained"
sx={{
width: '94px',
height: '42px',
borderRadius: '36px',
}}
color="primary"
type="submit">
Enter
</Button>
</Box>
</Box>
</form>
</Box>
</Container>
);
}

export default UsernamePage;

In this code, UsernamePage takes a setUsername prop, which is a function to update the username in the parent component (App.jsx). It contains a form with a text field for the username and a submit button. When the form is submitted, it calls the `setUsername` function with the entered username.

In `App.jsx`, we would typically maintain the username state and conditionally render either the `UsernamePage` or the `ChatPage` based on whether a username has been set.

import { useState } from 'react';
import UsernamePage from './component/UsernamePage.jsx';
import ChatPage from './component/ChatPage.jsx';

function App() {
const [username, setUsername] = useState(null);

return (
<div>
{username ? <ChatPage username={username} /> : <UsernamePage setUsername={setUsername} />}
</div>
);
}

export default App;

In this code, App maintains a username state. If username is null, it renders `UsernamePage` and passes the `setUsername` function as a prop. If username is not null, it renders `ChatPage` and passes username as a prop.

Creating the Chat Page

In the `ChatPage.jsx` file, we’ll create a `ChatPage` component. This component will handle the WebSocket connection, send and receive messages, and render the chat interface.

import { useState, useEffect, useRef } from 'react';
import { Client } from '@stomp/stompjs';
import SockJS from 'sockjs-client/dist/sockjs';
import ChatMessage from "./ChatMessage.jsx";
import { Button, TextField, Container, Box } from '@mui/material';

function ChatPage({ username }) {
const [messages, setMessages] = useState([]);
const [client, setClient] = useState(null);
const messageInputRef = useRef();

useEffect(() => {
const newClient = new Client({
webSocketFactory: () => new SockJS('http://localhost:8080/ws'),
onConnect: () => {
const joinMessage = {
sender: username,
type: 'CONNECT',
};
newClient.publish({ destination: '/app/chat.addUser', body: JSON.stringify(joinMessage) });
newClient.subscribe('/topic/public', message => {
const newMessage = JSON.parse(message.body);
setMessages(prevMessages => [...prevMessages, newMessage]);
});
},
onDisconnect: () => {
if (newClient.connected) {
const leaveMessage = {
sender: username,
type: 'DISCONNECT',
};
newClient.publish({ destination: '/app/chat.addUser', body: JSON.stringify(leaveMessage) });
}
},
});

newClient.activate();
setClient(newClient);

return () => {
newClient.deactivate();
};
}, [username]);

const sendMessage = () => {
if (messageInputRef.current.value && client) {
const chatMessage = {
sender: username,
content: messageInputRef.current.value,
type: 'CHAT',
};
client.publish({ destination: '/app/chat.sendMessage', body: JSON.stringify(chatMessage) });
messageInputRef.current.value = '';
}
};

return (
<Container>
<Box>
{messages.map((message, index) => (
<ChatMessage key={index} message={message} username={username} />
))}
</Box>
<form onSubmit={sendMessage}>
<TextField inputRef={messageInputRef} placeholder="Type a message..." />
<Button type="submit">Send</Button>
</form>
</Container>
);
}

export default ChatPage;

In this code, we’re using the useState and useEffect hooks from React to manage the state and side effects of the component. We’re using the Client class from @stomp/stompjs to create a STOMP client over a WebSocket connection. We’re using the publish method to send messages to the server and the subscribe method to receive messages from the server.

Creating the Chat Message Component

In the `ChatMessage.jsx` file, we’ll create a ChatMessage component. This component will display a single chat message.

import Avatar from 'react-avatar';
import { Box } from '@mui/material';

function ChatMessage({ message, username }) {
return (
<Box sx={{ display: 'flex', flexDirection: 'column', alignItems: message.sender === username ? 'flex-end' : 'flex-start', margin: '10px 0' }}>
<Box sx={{ display: 'flex', flexDirection: message.sender === username ? 'row-reverse' : 'row', alignItems: 'center', gap: 1 }}>
<Avatar name={message.sender} size="35" round={true} />
<h4>{message.sender}</h4>
</Box>
<Box sx={{
backgroundColor: message.sender === username ? 'primary.main' : 'secondary.main',
color: 'white',
borderRadius: '12px',
padding: '10px',
maxWidth: '80%',
}}>
<p>{message.content}</p>
</Box>
</Box>
);
}

export default ChatMessage;

In this code, we’re using the Box component from @mui/material to create a flexible layout, and the Avatar component from react-avatar to display the user avatar. We’re using the sx prop to apply styles to the components.

In this section, we’ve created a frontend application using React that interacts with our backend service over a WebSocket connection. We’ve used the @stomp/stompjs library to establish a STOMP protocol over WebSocket connection, and the @mui/material library to create the UI.

Let’s also try it out with the command line by creating the `CommandController` class:

@RestController
public class CommandController {

private final KafkaTemplate<String, Message> kafkaTemplate;
private final SimpMessageSendingOperations messagingTemplate;

public CommandController(KafkaTemplate<String, Message> kafkaTemplate, SimpMessageSendingOperations messagingTemplate) {
this.kafkaTemplate = kafkaTemplate;
this.messagingTemplate = messagingTemplate;
}

@PostMapping("/send")
public void send(@RequestBody Message message) {
kafkaTemplate.send("messaging", message);
messagingTemplate.convertAndSend("/topic/public", message);
}
}

After that, by executing these command in the terminal:

curl -X POST -H "Content-Type: application/json" -d "{\"type\":\"CONNECT\",\"content\":\"Hello, World!\",\"sender\":\"Command Line\"}" http://localhost:8080/send

curl -X POST -H "Content-Type: application/json" -d "{\"type\":\"CHAT\",\"content\":\"Hello, World!\",\"sender\":\"Command Line\"}" http://localhost:8080/send

We will also be able to see like:

By combining Spring Boot, Kafka, and WebSocket, we have built a real-time chat application that is scalable and efficient. We hope this article has provided you with a solid understanding of how these technologies can be used together to build real-time applications. Happy coding!

GitHub repositories:

server-side

client-side

--

--

Responses (1)