We often encounter projects that involve processing real-time data streams. In this blog post, we will explore a hands-on example of building a real-time stock market analytics system. We will leverage the power of Kafka, Kafka Streams, Redis, and Spring Boot to design and implement a scalable and fault-tolerant stream processing system. This project will not only enhance your understanding of real-time data processing but also introduce you to key technologies widely used in the industry.
Note: Complete code for this blog is in my github repository here stock-market-analytics
The goal of our project is to build a system that can process streaming data from the stock market in real-time and perform analytics to generate insights. Specifically, we will calculate the 5-day moving average of stock prices and store the results for further analysis.
Technology Stack:
- Apache Kafka: A distributed messaging system that allows us to handle large-scale streaming data, ensuring fault tolerance, scalability, and high throughput.
- Kafka Streams: A powerful client library built on top of Kafka, providing a high-level abstraction for building real-time stream processing applications.
- Redis: An in-memory data store that enables fast read and write operations. We will use Redis to store intermediate and frequently accessed data, improving overall system performance.
- Spring Boot: A popular Java framework that simplifies the development of web applications and provides seamless integration with Kafka, Kafka Streams, and Redis.
Data Ingestion
We will leverage the AlphaVantage API to fetch real-time stock market data, including stock quotes and time series data. This data will serve as our input stream for further processing.
We have used a Scheduler in our project to get the stock data after regular interval of time. For demo purpose, I am calling it every 15 seconds. But this can be set to just once a day since we are getting data from the api till the previous day.
We are specifically calling the TIME_SERIES_DAILY_ADJUSTED api with compact output size. Below is the code snippet to make WebClient call.
public Mono<QuoteTimeSeriesResponse> getTimeSeriesDailyAdjusted(String tickerId) {
return webClient
.get()
.uri(uriBuilder -> uriBuilder
.path("/query/")
.queryParam("function", "TIME_SERIES_DAILY_ADJUSTED")
.queryParam("symbol", tickerId)
.queryParam("outputsize", "compact")
.queryParam("apikey", apiKey)
.build())
.retrieve()
.bodyToMono(QuoteTimeSeriesResponse.class);
}
Please signup on the alphavantage website to get your own apikey.
Once we get the data, we are ingesting the same in Kafka topic TIME_SERIES_DAILY
Below is the code snippet for the same.
@Scheduled(fixedRate = 15000)
public void publishTimeSeriesStockData() {
log.info("Calculating time series data for IBM");
String quote = "IBM";
//This adds ZSET records in redis that is sorted by timestamp field.
RTimeSeries<Double,String> ts = redissonClient.getTimeSeries("stock-series");
alphaVantageWebClient.getTimeSeriesDailyAdjusted(quote).subscribe(tsda -> {
tsda.getMetadataResponse();
tsda.getTimeSeriesDaily().entrySet().stream().limit(5).forEach(e -> {
String dayOfClosing = e.getKey().toString();
ZonedDateTime closingDayInZonedDayTime = LocalDate.parse(dayOfClosing).atTime(23, 59).atZone(ZoneId.of("America" +
"/St_Johns"));
TimeSeriesDaily tsd = e.getValue();
TimeSeriesDailyQuote t = new TimeSeriesDailyQuote(quote,
tsd.getOpen(),
tsd.getHigh(),
tsd.getLow(),
tsd.getClose(),
tsd.getAdjustedClose(),
tsd.getVolume(),
tsd.getDividendAmount(),
tsd.getSplitCoefficient(),
String.valueOf(closingDayInZonedDayTime.toInstant().toEpochMilli()));
log.info("Logging time series data: {}", t);
ts.add(closingDayInZonedDayTime.toInstant().toEpochMilli(), t.getClose());
kafkaTemplate.send(TIME_SERIES_DAILY_TOPIC, t.getSymbol(),t);
});
});
}
Stream Processing with Kafka Streams
sing the Kafka Streams API, we will define a processing topology to calculate the 5-day moving average of stock prices. We will leverage windowing and aggregation techniques provided by Kafka Streams to perform real-time analytics on the data stream.
Below is the step by step explanation for the stream processing in TimeSeriesQuoteConsumer
Step 1: Consuming Events from Source Topic
We create a KStream named quoteDailyEvents by consuming events from the Kafka topic specified by the TIME_SERIES_DAILY_TOPIC constant. The Consumed.with() method sets a custom timestamp extractor called FiveDayTrailingTimeStampExtractor, which extracts timestamps from the consumed records.
KStream<String, TimeSeriesDailyQuote> quoteDailyEvents =
builder.stream(TIME_SERIES_DAILY_TOPIC, Consumed.with(new FiveDayTrailingTimeStampExtractor()));
Step 2: Defining a Time Window of 5 Days
We define a time window using the TimeWindows.ofSizeWithNoGrace() function. In this case, we set the window size to 5 days, which represents a logical time interval for grouping records based on their timestamps. In our scenario of calculating the 5-day moving average, a TumblingWindow is suitable because it creates non-overlapping fixed-size windows. This ensures that each event is assigned to only one window, simplifying the aggregation process and avoiding duplication or double-counting of data. Since we are interested in distinct, non-overlapping intervals for calculating the moving average, a TumblingWindow is the most appropriate choice.
TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofDays(5));
Step 3: Aggregating Data within the Time Window
We perform aggregation on the quoteDailyEvents stream. The stream is grouped by key using groupByKey(), windowed using the tumblingWindow configuration defined earlier using windowedBy(), and then aggregated using the aggregate() function. The aggregate() function takes two parameters: an initial value for the aggregate (new StockCountSum(0L, 0.0)) and a lambda function that defines how to update the aggregate based on incoming records.
quoteDailyEvents
.groupByKey()
.windowedBy(tumblingWindow)
.aggregate(() -> new StockCountSum(0L,0.0), (key, value, aggregate) -> {
aggregate.setCount(aggregate.getCount() + 1);
aggregate.setSum(aggregate.getSum() + value.getClose());
return aggregate;
})
Step 4: Transforming the Aggregated Data
After the aggregation, we transform the result using the map() function. We calculate the average of the aggregated values and create a new FiveDaysAverage object with the formatted average value. The average is formatted with four decimal places using String.format(). Finally, we return a new KeyValue pair with the key and the transformed object. And finally, we send the transformed stream to the “5_days_moving_average” Kafka topic using the to() function. The data will be written to the specified topic, allowing further processing or analysis.
.map((Windowed<String> key, StockCountSum stockAverage) -> { double aveNoFormat = stockAverage.getSum()/(double)stockAverage.getCount(); double formattedAve = Double.parseDouble(String.format("%.4f", aveNoFormat)); FiveDaysAverage fda = new FiveDaysAverage(formattedAve); return new KeyValue<>(key.key(), fda) ; }) .to("5_days_moving_average");
Step 5: Writing the Transformed Data to Output Topic
Finally, we send the transformed stream to the “5_days_moving_average” Kafka topic using the to() function. The data will be written to the specified topic, allowing further processing or analysis.
Caching in Redis
Here, I am just trying to demonstrate the use of Redis timeseries data structure to store stock data by closing dates. If needed, this can be used to project the data on grafana to generate graph of simple moving average of a stock for past 5 days.
Avro schemas
If you notice KafkaConfig class, I am using Avro for serializing and deserializing the data from/to kafka topics. By using Avro serialization, the code ensures that the data exchanged with Kafka is compact, efficient, and schema-aware. Avro’s schema evolution capabilities also allow for seamless compatibility when the data schema changes over time. This ensures interoperability between different components and systems that use Avro for data serialization and deserialization.