Rick

Rick
Rick

Wednesday, September 30, 2015

QBit supports Metrics, KPI gathering for runtime stats and reactive stats for Microservices

For some background on why this is important for microservices see Reactive Microservices Monitoring.

QBit supports Metrics, KPI gathering

QBit support collecting metrics for microservices. The QBit runtime statistics system can be queried, it can be clustered, and it can replicate to any statistic system. The core interfaces for the QBit runtime stats system is io.advantageous.qbit.service.stats. The main interface for collecting stats is StatsCollector.

StatsCollector

package io.advantageous.qbit.service.stats;

import io.advantageous.qbit.client.ClientProxy;

/**
 * Collects stats
 * This collects key performance indicators: timings, counts and levels/gauges.
 * Created by rick on 6/6/15.
 */
public interface StatsCollector extends ClientProxy {


    /** Increment a counter by 1.
     * This is a short cut for recordCount(name, 1);
     * @param name name name of metric, KPI, metric.
     */
    default void increment(String name) {
    }

    /**
     * Record a a count.
     * Used to record things like how many users used the site.
     *
     * @param name name of the metric, KPI, stat
     * @param count count to record.
     */
    default void recordCount(String name, long count) {
    }

    /**
     * This is used to record things like the count of current threads or
     * free system memory or free disk, etc.
     * Record Level. Some systems call this a gauge.
     * @param name name of the gauge or level
     * @param level level
     */
    default void recordLevel(String name, long level) {
    }

    /**
     * This is used to record timings.
     * This would be things like how long did it take this service to call
     * this remote service.
     * @param name name of the timing
     * @param duration duration
     */
    default void recordTiming(String name, long duration) {
    }

}

ServiceStatsListener

You will probably never use a StatsCollector but a StatsCollectorBuffer instead as it buffers metric calls to reduce IO and reporting to the stats engine. Another important concept in this package is the ServiceStatsListener. The ServiceStatsListenergets registered on your behalf if you use the ManagedServiceBuilder.
The ServiceStatsListener is used to intercept queue calls for the ServiceQueue. All services and end-points end up using the ServiceQueue. This class is able to track stats for services.

Default Service Stat Keys

   startBatchCountKey = serviceName + ".startBatchCount";
   receiveCountKey = serviceName + ".receiveCount";
   receiveTimeKey = serviceName + ".callTimeSample";
   this.queueRequestSizeKey =  serviceName + ".queueRequestSize";
   this.queueResponseSizeKey =  serviceName + ".queueResponseSize";
The ${serviceName}.startBatchCount tracks how many times a batch has been sent.
This can tell you how well your batching is setup.
The ${serviceName}.receiveCount is how many times the service has been called.
The ${serviceName}.callTimeSample is how long do methods take for this service (if enabled, call times are sampled).
The ${serviceName}.queueRequestSize keeps track of how large the request queue is. This is an indication of calls not getting handled if greater than 0. If this continues to rise then the service could be down. (Note there is a health check to see a queue is blocked, and the service will be marked unhealthy.)
The ${serviceName}.queueResponseSize keeps track of how large the response queue is getting. This is an indication that responses are not getting drained.
All of the classes that we covered so far are in QBit core. This means that stats, KPI gathering is just part of the QBit system. It is an integral part of microservices so it is an integral part of QBit.

StatService and StatsD

The StatService is in QBit admin package. The StatService interface allows you to both record stats, KPI, and metrics for microservices and to query the services. TheStatService can replicate KPIs (key performance indicators) to replicators. It does this efficiently.
Let's look at the StatService interface and its comments.
package io.advantageous.qbit.metrics;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.service.stats.Stats;
import io.advantageous.qbit.service.stats.StatsCollector;


/**
 * The StatService collects stats, and allows stats to be queried.
 * This collects key performance indicators: timings, counts and levels/gauges.
 * It also allow internal or external clients to query this system.
 *
 * Created by rick on 6/6/15.
 */
public interface StatService extends StatsCollector {


    /**
     * Get the last n Seconds of stats (up to two minutes of stats typically
     * kept in memory).
     *
     * The `Stat` object has the mean, median, etc.
     *
     * ```java
     *
     *    private final float mean;
     *    private final float stdDev;
     *    private final float variance;
     *    private final long sum;
     *    private final long max;
     *    private final long min;
     *    private final long median;
     * ```
     * @param callback callback to get Stat
     * @param name name metric, KPI, etc.
     * @param secondCount secondCount
     */
    default void statsForLastSeconds(Callback<Stats> callback, String name,
                                                             int secondCount) {
    }

    /**
     * Gets the average last n Seconds of of a level.
     *
     * @param callback callback
     * @param name name of metric, KPI, etc.
     * @param secondCount secondCount
     */
    default void averageLastLevel(Callback<Long> callback, String name,
                                                             int secondCount) {
    }

    /**
     * Gets count of the current minute
     *
     * @param callback callback
     * @param name name of metric
     */
    default void currentMinuteCount(Callback<Long> callback, String name) {
    }


    /**
     * Gets count of the current second.
     *
     * @param callback callback
     * @param name name of metric
     */
    default void currentSecondCount(Callback<Long> callback, String name) {
    }


    /**
     * Gets count of the last recorded full second.
     *
     * @param callback callback
     * @param name name of metric
     */
    default void lastSecondCount(Callback<Long> callback, String name) {
    }


    /**
     * Gets count of the last recorded ten full seconds.
     *
     * @param callback callback
     * @param name name of metric
     */
    default void lastTenSecondCount(Callback<Long> callback, String name) {
    }


    /**
     * Gets count of the last recorded five full seconds.
     *
     * @param callback callback
     * @param name name of metric
     */
    default void lastFiveSecondCount(Callback<Long> callback, String name) {
    }


    /**
     * Gets count of the last recorded N full seconds.
     *
     * @param callback callback
     * @param name name of metric
     */
    default void lastNSecondsCount(Callback<Long> callback, String name,
                                                              int secondCount) {
    }


    /**
     * Gets count of the last recorded N full seconds.
     * This is more exact if the count overlaps two minutes.
     *
     * @param callback callback
     * @param name name of metric
     */
    default void lastNSecondsCountExact(Callback<Long> callback, String name,
                                                              int secondCount) {
    }


    /**
     * Gets count of the last recorded N full seconds.
     * This is more exact if the count overlaps two minutes.
     *
     * @param callback callback
     * @param name name of metric
     */
    default void lastTenSecondCountExact(Callback<Long> callback, String name) {
    }

    /**
     * Gets count of the last recorded N full seconds.
     * This is more exact if the count overlaps two minutes.
     *
     * @param callback callback
     * @param name name of metric
     */
    default void lastFiveSecondCountExact(Callback<Long> callback, String name) {
    }

    /**
     * Bulk record.
     * @param name name of metric
     * @param count count
     * @param  timestamp timestamp
     */
    default void recordWithTime(String name, int count, long timestamp) {
    }


    /**
     * Bulk record.
     * @param names names of metric
     * @param counts counts of metrics
     * @param  timestamp timestamp
     */
    default void recordAll(long timestamp, String[] names, long[] counts) {
    }


    /**
     * Bulk record.
     * @param names names of metric
     * @param counts counts of metrics
     * @param  times times
     */
    default void recordAllWithTimes(String[] names,
                                 long[] counts, long[] times){
    }
}
You can query the metrics system and provide reactive support. For example, you could query the current REQUESTS PER SECOND to a service and dynamically change the size of buffering to increase throughput.
QBit does not only monitor metrics, but it makes the metrics queryable so your microservices can be reactive.
The StatService system that comes with QBit can replicate changes to other systems via the StatReplicator.
/**
 * Stat Replicator.
 * This is used to replicate stats to another system.
 * created by rhightower on 1/28/15.
 */
public interface StatReplicator extends RemoteTCPClientProxy, ServiceFlushable, Stoppable {
    void replicateCount(String name, long count, long time);
    void replicateLevel(String name, long level, long time);
    void replicateTiming(String name, long timing, long time);
}
The QBit Admin package has two built-in collectors. The StatsDReplicator (notice the statsD) implements StatReplicator and replicates via UDP to a StatsD server (e.g.,GraphiteStatsite, and more). The StatsD is a wire protocol over UDP to send stats. TheStatsDReplicator implements this wire protocol to talk UDP to a given host and port over UDP.
The QBit Admin package has two built-in collectors. The StatsDReplicator (notice the statsD) implements StatReplicator and replicates via UDP to a StatsD server (e.g.,GraphiteStatsite, and more). The StatsD is a wire protocol over UDP to send stats. TheStatsDReplicator implements this wire protocol to talk UDP to a given host and port over UDP. The other built-collector the LocalStatsCollector which just sends stats over a REST endpoint (/__stats/instance) that will deliver up a JSON version of the stats (and it resets the stats after the REST request) or it keeps collecting them until some other system queries the /__stats/instance REST endpoint. Both theStatsDReplicator and the LocalStatsCollector have builders, but you typically build them for free by using the ManagedServiceBuilder. We use LocalStatsCollector for Heroku-like environments.
You can configure StatsD via the ManagedServiceBuilder.

Configure StatsD with ManagedServiceBuilder

        if (config.isStatsD()) {
            managedServiceBuilder.setEnableStatsD(true);
            managedServiceBuilder.getStatsDReplicatorBuilder()
                                 .setHost(config.getStatsDHost());

            if (config.getStatsDPort() != -1) {
                managedServiceBuilder.getStatsDReplicatorBuilder()
                                 .setPort(config.getStatsDPort());
            }
        }
If you are using the JSON config file, you setup StatsD as follows:
{
     "statsD" : true,
     "statsDHost" : "lab99.myhost.com",
}
You can send your own stats and not just the ones that are sent via the default stats gathering.
Assuming you have a service called TodoService

Using stats from your own service

        /** Create a stats collector. */
        final StatsCollector statsCollector = managedServiceBuilder
                .getStatServiceBuilder().buildStatsCollector();

        final TodoService tododService = new TodoService(statsCollector,
                ReactorBuilder.reactorBuilder().build(),
                taskRepo,
                Timer.timer());

         /** Add the todo service to the managedServiceBuilder. */
        managedServiceBuilder.addEndpointService(tododService);

Passing stats collector, reactor and timer.

    public TodoService(final StatsCollector statsCollector,
                                  final Reactor reactor,
                                  final TaskRepo taskRepo,
                                  final Timer timer) {
        this.statsCollector = statsCollector;
        this.timer = timer;
        this.taskRepo = taskRepo;
        this.reactor = reactor;

        this.reactor.addServiceToFlush(statsCollector);
Calling reactor.addServiceToFlush and passing the statsCollector will ensure that when service queue that is managing the TodoService is idle or full that all of the stats will be flushed if there are any to save. The statsCollector is the one does buffering as mentioned earlier.
The reactor does not auto flush unless it is told to do. For now, you always use the reactor with the falling queue callback (no magic).

Calling reactor so that it can run jobs, coordinate calls and flush proxies

    /** Process Reactor stuff. */
    @QueueCallback({QueueCallbackType.LIMIT, QueueCallbackType.EMPTY})
    public void process(){
        reactor.process();
        time = timer.time();
    }
The reactor.process will flush all calls to statsCollector which will then send the stats to actual StatService where they will be replicated to all outstanding replicators.

Using recordCount

        /**
         * Load TODOs from TodoRepo.
         */
    @RequestMapping(value = "/todo", summary = "Load TODOs",
            ...)
    public void loadTodo(final Callback<Boolean> callback) {

        final Set<TodoCategory> categories = new HashSet<>(this.categories);

        /* If there are no categories or if service is paused, then return right away. */
        if (categories.size() > 0 && !stop) {
            loadFromTodoRepoCache++;
            statsCollector.recordCount("Todo.repo.call.count", 1);
        } else {
            logger.warn("Service can't load categories count {} or stopped {}",
            components.size(), stop);
            return;
        }
        ...
Notice the use of statsCollector.recordCount("Todo.repo.call.count", 1) since this is just incrementing one time we can callstatsCollector.increment("Todo.repo.call.count", 1).

Using increment

        /**
         * Load TODOs from TodoRepo.
         */
    @RequestMapping(value = "/todo", summary = "Load TODOs",
            ...)
    public void loadTodo(final Callback<Boolean> callback) {

        final Set<TodoCategory> categories = new HashSet<>(this.categories);

        /* If there are no categories or if service is paused, then return right away. */
        if (categories.size() > 0 && !stop) {
            loadFromTodoRepoCache++;
            statsCollector.increment("Todo.repo.call.count");
        } else {
            logger.warn("Service can't load categories count {} or stopped {}",
            components.size(), stop);
            return;
        }
        ...
Now lets show a timing.

statsCollector.recordTiming Timing how long a bunch of async calls took

        /**
         * Load TODOs from TodoRepo.
         */
    @RequestMapping(value = "/todo", summary = "Load TODOs",
            ...)
    public void loadTodo(final Callback<Boolean> callback) {

        final Set<TodoCategory> categories = new HashSet<>(this.categories);

        /* If there are no categories or if service is paused, then return right away.*/
        ...


        final long startTime = timer.time();

        /* For each TodoCategory call TodoRepo to load the todo items. */
        categories.forEach(category -> {

            final Callback<List<Todo>> todoCacheCallback =
                    createLoadFromCacheCallback(count, errorCount, category);
            taskRepo.loadTodosFromCache(todoCacheCallback, category);

        });

        /* Coordinate all of the callbacks are done. */
        reactor.coordinatorBuilder()
                /* If the success count is equal to the
                    component size, we are done. */
                .setCoordinator(() -> {
                            if (logger.isDebugEnabled()) {
                                logger.debug("COUNT " + count.get());
                            }
                            return count.get() == components.size();
                        }
                )
                /* Set the timeout to be seconds times two since
                    we are calling two services. */
                .setTimeoutDuration(config.getTimeoutMakingRemoteCallInSeconds() * 2)
                .setTimeoutTimeUnit(TimeUnit.SECONDS)
                /* If there were no errors, then return success. */
                .setFinishedHandler(() -> {
                           statsCollector
                           .recordTiming("Todo.loadCache.time",
                                   timer.time() - startTime);
                })
                /* Set the timeout handler to return no
                      success and log that there was a timeout. */
                .setTimeOutHandler(() -> {
                    logger.error("Timeout while loading todo items" );
                    callback.returnThis(false);
                }).build();

...
This records a start time startTime = timer.time() then it makes a bunch of async calls. And when all of the async calls return, we then send a timing to record how long the process took using statsCollector.recordTiming. To really understand the complex call coordination with the QBit reactor, you first need to understand how QBit coordinates calls, etc. You can learn more about this at QBit Reactive Microservices Tutorial for handling async calls with the reactor.
Here is a simpler timing example timing a call to Cassandra.

Timing how long a single call to Cassandra took

    public void executeAsyncCassandraCall(final Callback<ResultSet> callback,
                                     final Statement stmt) {
        final ResultSetFuture future = this.session.executeAsync(stmt);
        final long startTime = timer.time();

        Futures.addCallback(future, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(ResultSet result) {
                statsCollector
                           .recordTiming("Cassandra.load.time",
                                   timer.time() - startTime);
                callback.accept(result);

            }

            @Override
            public void onFailure(Throwable t) {
                statsCollector
                           .recordTiming("Cassandra.load.error.time",
                                   timer.time() - startTime);
                callback.onError(t);
            }
        });

    }
Notice that we use final long startTime = timer.time() and we record two timings either how long the successful call took or how long the error took.
To store a level just use style.

Store a level

        statsCollector.recordLevel("Todo.categories.size",
                     categories.size());
Remember is a level is a gauge like how large is my cache, how many outstanding items are in my queue, etc.
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training