Optimizing accesses

Creating and accessing millions of Runs, SubRuns, or Events can hace a large performance impact. Hence, multiple optimizations are available to speed them up.

Batching writes

The creation of Runs, SubRuns, and Events, as well as the storage of data products can be batched. The following code sample illustrates how to use the WriteBatch object for this purpose.

main.cpp (show/hide)

#include <iostream>
#include <string>
#include <hepnos.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/vector.hpp>

using namespace hepnos;

struct Particle {

    std::string name;
    double x, y, z;

    Particle() {}

    Particle(const std::string& name, double x, double y, double z)
    : name(name), x(x), y(y), z(z) {}

    template<typename Archive>
    void serialize(Archive& ar, const unsigned int version) {
        ar & name;
        ar & x;
        ar & y;
        ar & z;
    }
};

int main(int argc, char** argv) {

    if(argc != 3) {
        std::cerr << "Usage: " << argv[0] << " <protocol> <configfile>" << std::endl;
        exit(-1);
    }

    DataStore datastore = DataStore::connect(argv[1], argv[2]);
    // Get the root of the DataStore
    DataSet root = datastore.root();
    // Create a DataSet
    DataSet example11 = root.createDataSet("example11");
    // Create a Run, a SubRun, and an Event, but delay
    // the actual creation using a WriteBatch
    {
        WriteBatch batch(datastore);
        Run run = example11.createRun(batch, 1);
        SubRun subrun = run.createSubRun(batch, 4);
        Event event = subrun.createEvent(batch, 32);
        // Store a product into the event
        Particle p("electron", 3.4, 4.5, 5.6);
        ProductID pid = event.store(batch, "mylabel", p);
        // The batch is flushed at the end of the scope
    }
    // Reload a product from the event
    {
        auto run = example11[1];
        auto subrun = run[4];
        auto event = subrun[32];
        Particle p;
        bool b = event.load("mylabel", p);
        if(b) std::cout << "Particle loaded succesfully" << std::endl;
        else  std::cout << "Particle wasn't loaded" << std::endl;
    }
}

The WriteBatch object is initialized with a datastore. A second argument, unsigned int max_batch_size (which defaults to 128), can be provided to indicate that at most this number of operations may be batched together. When this number of operations have been added to the batch, the batch will automatically flush its content. The WriteBatch can be flushed manually using WriteBatch::flush(), and any remaining operations will be flushed automatically when the WriteBatch goes out of scope.

The WriteBatch object can be passed to DataSet::createRun, Run::createSubRun, SubRun::createEvent, as well as all the store methods.

Note

The max_batch_size doesn’t represent the total number of items that have to be written to trigger a flush. The WriteBatch internally keeps as many batches of key/value pairs as the number of underlying databases, each batch with its own limit of max_batch_size. Hence if max_batch_size is 128 and the client has written 254 items, 127 of which will go into one database and 127 other will go into another database, the WriteBatch won’t automatically flush any of these batches until they reach 128.

Prefetching reads

Prefetching is a common technique to speed up read accesses. Used alone, the Prefetcher class will read batches of items when iterating through a container. The following code sample examplifies its use.

main.cpp (show/hide)

#include <iostream>
#include <string>
#include <hepnos.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/vector.hpp>

using namespace hepnos;

struct Particle {

    std::string name;
    double x, y, z;

    Particle() {}

    Particle(const std::string& name, double x, double y, double z)
    : name(name), x(x), y(y), z(z) {}

    template<typename Archive>
    void serialize(Archive& ar, const unsigned int version) {
        ar & name;
        ar & x;
        ar & y;
        ar & z;
    }
};

int main(int argc, char** argv) {

    if(argc != 3) {
        std::cerr << "Usage: " << argv[0] << " <protocol> <configfile>" << std::endl;
        exit(-1);
    }

    DataStore datastore = DataStore::connect(argv[1], argv[2]);

    // Get the root of the DataStore
    DataSet root = datastore.root();
    // Create a DataSet
    DataSet example12 = root.createDataSet("example12");
    // Create a Run, a SubRun, and many Events
    Run run = example12.createRun(1);
    SubRun subrun = run.createSubRun(4);
    for(unsigned i = 0; i < 20; i++) {
        Event event = subrun.createEvent(32+i);
        // Store a product into the event
        Particle p("electron", 3.4+i, 4.5+i, 5.6+i);
        ProductID pid = event.store("mylabel", p);
    }
    // Reload using a Prefetcher
    Prefetcher prefetcher(datastore);
    // Enable loading Particle objects associated with the label "mylabel"
    prefetcher.fetchProduct<Particle>("mylabel");
    // Loop over the events in the SubRun using the Prefetcher
    for(auto& event : prefetcher(subrun)) {
        Particle p;
        bool b = event.load(prefetcher, "mylabel", p);
        if(b) std::cout << "Particle loaded succesfully" << std::endl;
        else  std::cout << "Particle wasn't loaded" << std::endl;
    }
}

The Prefetcher object is initialized with a DataStore instance, and may also be passed a unsigned int cache_size and unsigned int batch_size. The cache size is the maximum number of items that can be prefetched and stored in the prefetcher’s cache. The batch size is the number of items that are requested from the underlying DataStore in a single operation.

A Prefetcher instance can be passed to most functions from the RunSet, Run, and SubRun classes that return an iterator. This iterator will then use the Prefetcher when iterating through the container. The syntax illustrated above, passing the subrun to the Prefetcher::operator()() method, shows a simple way of enabling prefetching in a modern C++ style for loop.

By default, a Prefetcher will not prefetch products. To enable prefetching products as well, the Prefetcher::fetchProduct<T>(label) can be used. This method does NOT load any products, it tells the Prefetcher to prefetch products of type T with the specified label as the iteration goes on. The load function that is used to load the product then needs to take the prefetcher instance as first argument so that it looks in the prefetcher’s cache first rather than in the datastore.

Important

If prefetching is enabled for a given product/label, it is expected that the client program consumes the prefetched product by calling load. If it does not, the prefetcher’s memory will fill up with prefetched products that are never consumed.

Using asynchronous operations

Most of the operations on Runs, SubRuns, and Events, as well as Prefetcher and WriteBatch, can be turned asynchronous simply by using an AsyncEngine instance. The following code examplifies how.

main.cpp (show/hide)

#include <iostream>
#include <string>
#include <hepnos.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/vector.hpp>

using namespace hepnos;

struct Particle {

    std::string name;
    double x, y, z;

    Particle() {}

    Particle(const std::string& name, double x, double y, double z)
    : name(name), x(x), y(y), z(z) {}

    template<typename Archive>
    void serialize(Archive& ar, const unsigned int version) {
        ar & name;
        ar & x;
        ar & y;
        ar & z;
    }
};

int main(int argc, char** argv) {

    if(argc != 3) {
        std::cerr << "Usage: " << argv[0] << " <protocol> <configfile>" << std::endl;
        exit(-1);
    }

    DataStore datastore = DataStore::connect(argv[1], argv[2]);

    // Get the root of the DataStore
    DataSet root = datastore.root();
    // Create a DataSet
    DataSet example13 = root.createDataSet("example13");
    {
        AsyncEngine async(datastore,1);
        // Create a Run, a SubRun, and many Events
        Run run = example13.createRun(async, 1);
        SubRun subrun = run.createSubRun(async, 4);
        for(unsigned i = 0; i < 20; i++) {
            Event event = subrun.createEvent(async, 32+i);
            // Store a product into the event
            Particle p("electron", 3.4+i, 4.5+i, 5.6+i);
            ProductID pid = event.store(async, "mylabel", p);
        }
    }
    // Reload using a Prefetcher and AsyncEngine
    {
        Run run = example13[1];
        SubRun subrun = run[4];

        AsyncEngine async(datastore, 1);
        Prefetcher prefetcher(async);
        // Enable loading Particle objects associated with the label "mylabel"
        prefetcher.fetchProduct<Particle>("mylabel");
        // Loop over the events in the SubRun using the Prefetcher
        for(auto& event : prefetcher(subrun)) {
            Particle p;
            bool b = event.load(prefetcher, "mylabel", p);
            if(b) std::cout << "Particle loaded succesfully" << std::endl;
            else  std::cout << "Particle wasn't loaded" << std::endl;
        }
    }
}

The AsyncEngine object is initialized with a DataStore instance and a number of threads to spawn. Note that using 0 threads is perfectly fine since the AsyncEngine turns all communication operations into non-blocking operations, the lack of background threads will not prevent the AsyncEngine from being able to make some amount of progress in the background.

The AsyncEngine object can be passed to DataSet::createRun, Run::createSubRun, SubRun::createEvent, as well as all the store methods. When used, these operations will be queued in the AsyncEngine and eventually execute in the background.

The AsyncEngine instance can also be passed to the constructor of WriteBatch and Prefetcher. When used with a WriteBatch, the AsyncEngine will continually take operations from the WriteBatch, batch them, and execute them. Hence the batches issued by the AsyncEngine may be smaller than the maximum batch size of the WritBatch object.

When used with a Prefetcher, the Prefetcher will prefetch asynchronously using the AsyncEngine’s threads.