Building real time data pipelines with Kafka Streams II

Beloved coders,

as we promised in our previous post, we’ll show you how to build a simple, event-based service based on Spring Boot and Kafka Streams, that uses several of the more powerful features of this technology: windows and key/value stores. We are basing our simplified example in a real scenario we’ve faced at Datio.

The problem

Let’s imagine we want to collect metrics from the docker containers we have deployed in our servers. Fortunately, our sysadmins have installed Telegraf agents in all our machines in an effort to get the whole system monitorized. Telegraf is an agent for collecting and reporting metrics that provides a docker plugin to extract metrics from the docker daemon and a Kafka plugin to send them to our cluster… so it seems we have all we need in order to obtain a time series of the resources consumed by our docker containers.

However, we have realized there are two little obstacles we have to be aware of. The Telegraf agent doesn’t send an event with all the metrics of a given container. Instead, it sends an event for metric. Let’s imagine that the events look like these:

// Memory metric event
{
 "idContainer": "id1",
 "timestamp": 10000,
 "metrics": {
   "container_memory_limit": X,
   "container_memory_usage": X,
   "container_memory_usage_percent": X
 }
}
// CPU usage event
{
 "idContainer": "id1",
 "timestamp": 10000,
 "metrics": {
   "container_cpu_limit": X,
   "container_cpu_usage": X
 }
}

In addition to this, we realize that the CPU metric is served is seconds, but our sysadmins would like to get this metric calculated in percentage. The desired output contains all the metrics merged, and it would look like this:

{
 "idContainer": "id1",
 "timestamp": 10000,
 "metrics": {
   "container_cpu_limit": X,
   "container_memory_usage_percent": X,
   "container_memory_usage": X,
   "container_memory_limit": X,
   "container_cpu_usage": X,
   "container_cpu_usage_percent": X
 }
}

Let’s see how we can help our system administrators using Kafka Streams…

The solution

Architecture

As we’ve seen before, the architecture of our solution will include Docker, Telegraf, Kafka and a little metrics normalizer coded with Kafka Streams. Sexy, isn’t it?:

In order to get an scalable system we must ensure that all the metrics of a given docker container are received in the same partition of a Kafka topic, so the key of all the metrics sent by Telegraf will be the hostname where the container is running. The more containers and servers we have in our production environment, the more partitions can be created.

Streaming pipeline

What will be our strategy to get all the metrics of a container in a single event?

Looking carefully to the input messages, we realized that all the events collected in the same instant are received with the same timestamp, so we can group them on windows based in the timestamp and the container id. The length of these windows must be less than the lapse between metrics in the Telegraf agent.

How can we calculate the percentage of cpu usage?

This metric can’t be calculated with a single metric, so we will have to use two consecutive metrics to get this measure. Key/value stores of Kafka Streams seems a good fit for these problem.

The code

Let’s see, step by step, the more relevant points of the code.

Events model

In order to represent the input and output events we have created two POJOs serving the raw metrics sent by Telegraf, and the desired normalized metric. The only relevant methods we added to our methods are the following ones:

We must be able to merge to metric events received in the same window:

@JsonProperty("idContainer")
private String idContainer;
@JsonProperty("timestamp")
private Long timestamp;
@JsonProperty("metrics")
private HashMap<String,Float> metrics;
...
public void merge(RawMetric newMetric){
       this.idContainer = newMetric.getIdContainer();
       this.timestamp = newMetric.getTimestamp();
       for(String literal : newMetric.metrics.keySet()){
           if(! metrics.containsKey(literal)){
               metrics.put(literal,newMetric.metrics.get(literal));
           }
       }
}

We must be able to calculate the cpu usage percentage between two metrics with the following method:

@JsonProperty("idContainer")
private String idContainer;
@JsonProperty("timestamp")
private Long timestamp;
private Float memoryLimit;
private Float memoryUsage;
private Float memoryUsagePercent;
private Float cpuLimit;
private Float cpuUsage;
private Float cpuUsagePercent;
public static float calculateCPUUsagePercent(RawMetric oldM, RawMetric newM){
       return 100*(newM.getMetrics().get(RawMetric.CPU_USAGE) - oldM.getMetrics().get(RawMetric.CPU_USAGE)/(newM.getTimestamp()-oldM.getTimestamp()));

Pipeline

The streaming pipeline will be composed by the following steps:

Parsing the input events

First of all, we must parse the received data into the objects we’ve specified before. We talked in the previous post that Kafka Streams follows a fail fast policy, and we didn’t want that any malformed event could kill our processing pipeline, so take a look at the pattern we’ve used to avoid that situation:

private KStream<String, RawMetric> parseRawMetrics(KStream<String, String> inputStream) {
       EventHelper<RawMetric> rawMetricEventHelper = new EventHelper<RawMetric>(RawMetric.class);
       return inputStream.flatMapValues(str->{
           try {
               return Collections.singletonList(rawMetricEventHelper.extract(str));
           } catch (IOException e) {
               logger.error("Error parsing event: "+e.getMessage());
               return Collections.emptyList();
           }
       })
   }

We are using a flatmap instead of a map transformation so that we are able to return an empty list to the stream in case we are not able to parse the input message.

Grouping single metrics

Next step of our pipeline involves grouping all the metrics received within the same window time in a single event. As shown in the next figure, the key elected to group the events will be a combination of the container id and the timestamp of the metric:

 

We can assume that this solution is working because we chose the hostname of the server as the key of the input stream. This way, all the metrics of the same containers will be received in the same partition.

Let’s take a look at the code used for the windowing, and how we are using the merge method we created in the RawMetric POJO.

private KStream<String,RawMetric> groupMetrics(KStream<String,RawMetric> metricStream) {
       Serde<RawMetric> rawMetricSerde = SerdeFactory.createSerde(RawMetric.class);
       KTable<Windowed<String>, RawMetric> groupedMetrics = metricStream.groupBy((key,metric)->metric.getIdContainer()+"-"+metric.getTimestamp(),
               Serialized.with(Serdes.String(),rawMetricSerde)).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(windosSizeSeconds))).aggregate(
               ()-> new RawMetric(),
               (String aggKey, RawMetric newMetric, RawMetric aggMetric)->{
                   aggMetric.merge(newMetric);
                   return aggMetric;
               },

               Materialized.<String,RawMetric,WindowStore<Bytes,byte[]>>as("GROUPING.WINDOW").withKeySerde(Serdes.String()).withValueSerde(rawMetricSerde)
       );
       return groupedMetrics.toStream().map((key,metric)->KeyValue.pair(metric.getIdContainer(),metric));
   }

The stream consequence of this aggregation will include one event per window, with all the metrics received from a unique container. For instance, if Telegraf agents are configured to send metrics every minute, a window length of 30 seconds may be a good option.

Calculating CPU percentage

As we mention before, we need two metrics in order to calculate the cpu usage percentage consumed by a container using the following formula:

That means that every time we receive a metric, we must use the previous one to calculate this percentage. It seems a very good scenario to use the KeyValue stores provided by Kafka Streams. Another typical scenario to use this kind of structures is deduplication when we are working with non idempotent data.

First step to use a store is adding it to the StreamBuilder. In this case we are using a in-memory as we don’t care very much if we lose some data, but there are other ones (persistent, window stores…) available in case your business scenario requires a better QoS.

Serde<RawMetric> rawMetricSerde = SerdeFactory.createSerde(RawMetric.class);
StoreBuilder<KeyValueStore<String,RawMetric>> metricsStoreBuilder = Stores.keyValueStoreBuilder(
               Stores.inMemoryKeyValueStore("METRICS_STORE"),
               Serdes.String(),
               rawMetricSerde
       );
builder.addStateStore(metricsStoreBuilder);

Using stores requires the low level processor API of Kafka Streams. This API provides us with two kind of nodes:

  • Processors. We use this nodes when we are not interested in returning anything to the stream. This may be useful when we want to send data to an external system, for instance a database.
  • Transformers. We use this nodes in case we are interested in returning events to the stream. We use the data on the store to transform the events received and return (or not) a new object to the stream.
In our example, we will use the transformer API to calculate the normalized metric with the percentage of CPU included.
KStream<String,NormalizedMetric> normalizedMetrics = groupedMetrics.transform(MetricsTransformer::new,"METRICS_STORE");

Our transformer receives a RawMetric object and returns a NormalizedMetric one. We use the init method to access the key/value store, but we can schedule some actions we may want to be done periodically, like cleaning the store with old metrics.

@Override
   public void init(ProcessorContext processorContext) {
       this.context = processorContext;
       this.store = (KeyValueStore<String,RawMetric>) this.context.getStateStore("METRICS_STORE");
       // we can delete all metrics from the store. Worst case, we will lose 1 metric
       // when the store is empty
       this.context.schedule(60000,PunctuationType.WALL_CLOCK_TIME, (timestamp)->{
           eraseCacheInfo();
       });
   }

In the transform method, we have all the logic required to calculate our normalized metric. We look for a previous metric, calculate the result and store the new one. In case no previous metric is found in the store, we can’t return anything, but we keep the received one ready to be used in the next iteration.

@Override
   public KeyValue<String, NormalizedMetric> transform(String key, RawMetric rawMetric) {
      if(store.get(rawMetric.getIdContainer()) != null){
          try {
              logger.info("Calculating normalized metric "+helper.toString(rawMetric));
          } catch (JsonProcessingException e) {
              e.printStackTrace();
              return null;
          }
          // Calculate Normalized metric
          NormalizedMetric metric = new NormalizedMetric();
          metric.setIdContainer(rawMetric.getIdContainer());
          metric.setTimestamp(rawMetric.getTimestamp());
          metric.setCpuLimit(rawMetric.getMetrics().get(RawMetric.CPU_LIMIT));
          metric.setCpuUsage(rawMetric.getMetrics().get(RawMetric.CPU_USAGE));
          metric.setMemoryLimit(rawMetric.getMetrics().get(RawMetric.MEMORY_LIMIT));
          metric.setMemoryUsage(rawMetric.getMetrics().get(RawMetric.MEMORY_USAGE));
          metric.setMemoryUsagePercent(rawMetric.getMetrics().get(RawMetric.MEMORY_USAGE_PERCENT));
          metric.setCpuUsagePercent(NormalizedMetric.calculateCPUUsagePercent(store.get(rawMetric.getIdContainer()),rawMetric));
          // stores the actual metric
          store.put(rawMetric.getIdContainer(),rawMetric);
          return KeyValue.pair(key,metric);
      }else{
          // No previous metric. Store actual metric but nothing to
          try {
              logger.info("No previous metric. Storing metric "+helper.toString(rawMetric));
          } catch (JsonProcessingException e) {
              e.printStackTrace();
              return null;
          }
          store.put(rawMetric.getIdContainer(),rawMetric);
          return null;
      }
   }

Initialization

Last step is including our topology in the KafkaStreams object and start it when the Spring Boot application starts:

@PostConstruct
   public void runStream(){
       stream = new KafkaStreams(topologyBuilder.defineTopology(),configRetriever.getConfig());
       stream.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
           @Override
           public void uncaughtException(Thread thread, Throwable throwable) {
               logger.error("Uncaught exception: "+throwable.getMessage());
               closeStream();
               Thread.currentThread().interrupt();
               Runtime.getRuntime().exit(1);
           }
       });
       stream.start();
       //Graceful shutdown
       Runtime.getRuntime().addShutdownHook(new Thread(this::closeStream));
   }

The end

In this post we’ve shown a complete, end to end, example of data pipeline with Kafka Streams, using windows and key/value stores. The whole project can be found here, including a test with the TopologyTestDriver provided by Kafka. However, at the date of writing this post, commit interval of this object must taken into account when dealing with windows. TopologyTestDriver commits after each record it processes (this includes flushing the KTable state store caches). When you run against the Cluster, Kafka Streams, by default only commits every 30 seconds, thus consecutive updates are “de-duplicated”. In the other hand, state stores can be tested without problems, but don’t forget to close your test driver after every test in order to get your data cleaned up.

 


Resources

https://pixabay.com/es/bodega-abovedada-t%C3%BAnel-arcos-keller-258906/

David Oñoro

I'm a telecommunications engineer with more than 10 years of experience with event oriented architectures (messaging middlewares, complex event processing, etc). Over the last years, I've specialized myself in big data architectures, focusing special interest in real time and streaming technologies. When I'm not working in the Datio's architecture team, you can find me taking care of two little kids. Or maybe swimming in case I'm lucky...

More Posts