Scale Out mit Hazelcast

4. September 2019 Von Thomas Müller

Wie man mit Hazelcast horizontal skalierbare Applikationen bauen kann

Anforderung: Paralleles Crawling von Postings

In einem Kundenprojekt sollen alle Postings von diversen User-Accounts eines externen Dienstes gesammelt werden. Da der Dienst keine Möglichkeit anbietet, die Postings über eine API abzufragen, kam die Idee, die Postings direkt von der Webseite zu crawlen. Das Risiko der Fehleranfälligkeit bzgl. des Parsen einer Webseite wurde dabei bewusst in Kauf genommen.

Da es sich bei dem Dienst um eine React-Anwendung handelt, war dann auch klar, dass das statische Parsen der einzelnen Seiten nicht möglich ist. Es musste also jede zu crawlende Seite noch zusätzlich mit einem Tool geparst werden, um den JavaScript-Code zu interpretieren, aus dem der dynamische DOM-Baum mit den zu extrahierenden Inhalten erzeugt wurde. Danach konnte man auf die eigentlichen Inhalte per XPath zugreifen.

Es sollen bis zu 4.000 Accounts regelmäßig auf Postings überprüft und auch abgeholt werden. Problem dabei sind die Performance der Kombination aus Netzwerk-Kommunikation, Parsen der Webseite mit Selenium (Chromedriver) sowie die Nutzung des Chrome-Browsers im Headless Mode auf dem Server. Um die Informationen eines einzelnen Postings zu erhalten, kann dies bis zu 2-3 Sekunden pro Posting benötigen. Wenn man das nun auf 4.000 Accounts mit all ihren Postings hochrechnet, kommt man schnell darauf, dass dies ohne eine Skalierung nicht möglich ist.

Bei ersten Lasttests zeigte sich, dass der Google-Chrome-Browser bei paralleler Abarbeitung den Server mit einem OutOfMemory-Fehler relativ schnell in die Knie zwingt. Von daher war auch bald klar, dass eine Vertikalskalierung (Scale Up) wie z. B. eine Erhöhung des RAM-Speichers hier nicht viel bringen würde und wir den Weg über das horizontale Skalieren (Scale Out) gehen mussten.

Für unseren Anwendungsfall bedeutete das, dass alle laufenden Anwendungen parallel die entsprechenden Accounts crawlen und die dazugehörigen Postings einsammeln und weiterverarbeiten. Um horizontal zu skalieren, musste ein verteiltes Queueing her, das architekturell wie in der Abbildung funktionieren sollte: Alle zu crawlenden Accounts stehen in einer Liste, die als Messages in die einzelnen Crawling-Queues eingefügt werden. Die Crawler-Instanzen nehmen sich dann immer die nächsten Accounts aus der Queue heraus. Damit ist sichergestellt, dass nicht mehrere Crawler denselben Account bearbeiten:

Die nächste Frage war nun, mit welchen Mitteln wir das Messaging umsetzen. Wir wollten keinen neuen, extra einzurichtenden Messaging-Server wie z. B. RabbitMQ, Apache Kafka o.ä. haben, da dies nur zusätzlichen Administrationsaufwand bedeuten würde. Die Entscheidung fiel daher auf ein In-Memory Data Grid.

In-Memory Data Grids

In-Memory Data Grids zeichnen sich dadurch aus, dass man Daten auf mehreren verteilten Servern im Hauptspeicher hält. Es können dynamisch neue Server hinzugefügt oder auch wieder entfernt werden. Somit entsteht eine hohe horizontale Skalierbarkeit. Wichtige Vertreter von In-Memory Data Grids sind

  • Hazelcast
  • Ehcache
  • Oracle Coherence

Aufgrund der guten Erfahrungen bzgl. der Integration in Spring-Projekten, der Möglichkeit alles in einem Embedded-Mode zu fahren sowie der mittlerweile großen Verbreitung in Entwicklerkreisen, entschieden wir uns zum Einsatz von Hazelcast.

Hazelcast

Hazelcast selbst ist ein In-Memory Data Grid mit sehr vielen Features wie z. B. Queues/Topics, Executors, Key-Value-Store, etc. Es existiert eine Open-Source-Version, zusätzlich dazu bietet die kommerzielle Version noch weitere Features an.

Hazelcast-Betrieb

Um Hazelcast zu betreiben, gibt es zwei unterschiedliche Modi:

  • Embedded-Mode
  • Client-Server-Mode

Im Embedded-Mode wird Hazelcast in jeder Applikation, in der es eingebunden ist, als eigenes Hazelcast-Member hochgefahren. Jede Applikation kommuniziert dann direkt per API-Client mit dem entsprechenden Hazelcast-Member

Im Client-Server-Mode werden die einzelnen Hazelcast-Member eigenständig hochgefahren und bilden dann so das Cluster. Über API-Clients kann anschließend auf das Cluster zugegriffen werden.

Clustering

Hazelcast bietet mehrere Möglichkeiten, ein Cluster aufzuziehen:

  • Der einfachste Weg ist per „Multicast Discovery“ wo sich alle Hazelcast-Member per UDP automatisch finden. Dies ist auch der Standardweg von Hazelcast, der keine weitere Konfiguration benötigt.
  • Es ist auch möglich, die Adressen der Members direkt per TCP zu spezifizieren. Dies ist vor allem dann sinnvoll, wenn nicht die Möglichkeit besteht, dass die einzelnen Members sich per UDP finden können.
  • Bei einem Cloud-Deployment unterstützt Hazelcast eine automatische Discovery für AWS und Google Compute Engine.
  • Hazelcast bietet ein Discovery Service Provider Interface (SPI) an, das jedem erlaubt, einen eigenen Discovery-Mechanismus zu implementieren.
Persistenz

Hazelcast unterstützt zwei Möglichkeiten, die Daten zu persistieren:

  • File-System
  • jeglicher Data-Store bzw. Datenbank

Umsetzung

Die Anwendung selbst ist eine Spring-Boot-Applikation. Da es dort auch für Hazelcast nützliche Erweiterungen gibt, haben wir uns dazu entschieden, Hazelcast im Embedded-Mode laufen zu lassen. Im Netzwerk, in dem die Applikationen dann laufen sollen, ist ein Multicast-Discovery möglich. Auf eine Persistenz haben wir bewusst verzichtet.

Dependencies

Um Hazelcast im Embedded-Mode laufen zu lassen, wird folgende Maven-Dependency benötigt:

<dependency>
  <groupId>com.hazelcast</groupId>
  <artifactId>hazelcast</artifactId>
  <version>3.11</version>
</dependency>

Zur zusätzlichen Spring-Integration haben wir dann noch folgende Dependency hinzugefügt:

<dependency>
  <groupId>com.hazelcast</groupId>
  <artifactId>hazelcast-spring</artifactId>
  <version>3.11</version>
</dependency>
Hazelcast-Konfiguration

Bei der Konfiguration haben wir uns für eine Java-Config entschieden. Selbstverständlich kann die Konfiguration auch in XML geschehen.

HazelcastConfig
public class HazelcastConfig {
 
    private static final Logger LOG = LoggerFactory.getLogger(HazelcastConfig.class);
 
    @Bean
    HazelcastInstance hazelcastInstance(Config hazelCastConfig) {
        final HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(hazelCastConfig);
        return hazelcastInstance;
    }
 
    @Bean
    Config hazelCastConfig(HazelcastProperties hazelcastProperties) {
        LOG.info("Configuring Hazelcast with the following Properties: {}", hazelcastProperties.toString());
 
        Config config = new Config();
        config.getGroupConfig().setName(hazelcastProperties.getGroupName());
        configureNetwork(hazelcastProperties, config);
        config.setProperty("hazelcast.logging.type", "slf4j");
 
        return config;
    }
 
    private void configureNetwork(HazelcastProperties hazelcastProperties, Config config) {
        NetworkConfig network = config.getNetworkConfig();
        network.setPort(hazelcastProperties.getPort());
        network.setPortAutoIncrement(false);
        JoinConfig join = network.getJoin();
        join.getAwsConfig().setEnabled(false);
        join.getMulticastConfig().setEnabled(true);
        join.getTcpIpConfig().setEnabled(false);
    }
}

Damit sich die einzelnen Nodes im Cluster finden, ist es nötig, einen einheitlichen Gruppennamen zu vergeben. Dieser wird hier in der Konfiguration über config.getGroupConfig().setName() anhand der application.properties gesetzt. In der Netzwerk-Konfiguration setzen wir pro Node den jeweiligen Port für Hazelcast. Zusätzlich wird durch join.getMulticastConfig().setEnabled(true) und join.getTcpIpConfig().setEnabled(false) das Multicast-Discovery für jeden Node aktiviert.

HazelcastProperties
@Component
public class HazelcastProperties {
 
    private final String groupName;
    private final Integer port;
 
    public HazelcastProperties(
            @Value("${hazelcast.groupName}") String groupName,
            @Value("${hazelcast.port}") Integer port) {
        this.groupName = groupName;
        this.port = port;
    }
 
    public String getGroupName() {
        return groupName;
    }
 
    public Integer getPort() {
        return port;
    } 
} 
application.properties
hazelcast.port=5701
hazelcast.groupName=webcrawler
Implementierung der Queues

Für das Befüllen und Auslesen der Hazelcast-Queues verwenden wir ein serialisierbares Objekt PostingEvent. Das Objekt selbst hält als Property einen Message-Index als ID, den wir zu Tracing-Zwecken eingeführt haben, sowie den eigentlichen Account-Namen, der zu crawlen ist.

PostingEvent
public class PostingEvent implements Serializable {
 
    private static final long serialVersionUID = -4753098570883793338L;
 
    private String messageIdx;
    private String accountName;
 
    public String getMessageIdx() {
        return messageIdx;
    }
 
    public void setMessageIdx(String messageIdx) {
        this.messageIdx = messageIdx;
    }
 
    public String getAccountName() {
        return accountName;
    }
 
    public void setAccountName(String accountName) {
        this.accountName = accountName;
    }
}

Die beiden nachfolgenden Klassen HazelcastSendingService und HazelcastEventListener steuern die eigentliche Skalierung.

HazelcastSendingService
@Service
public class HazelcastSendingService {
 
    private final HazelcastInstance hazelcastInstance;
 
    public MessageSendingService(HazelcastInstance hazelcastInstance) {
        this.hazelcastInstance = hazelcastInstance;
    }
 
    public void createPostingEvent(final String accountName) {
        final PostingEvent event = new PostingEvent();
        event.setMessageIdx(UUID.randomUUID().toString());
        event.setAccountName(accountName);
        hazelcastInstance.getQueue("accountPostingsQueue").offer(event);
    }
}

Der HazelcastSendingService sorgt dafür, dass die jeweils neu erzeugten PostingEvents in die Queue „accountPostingsQueue“ gelegt werden. Um an eine Queue in Hazelcast heranzukommen, nutzt man die globale Hazelcast-Instanz, mit der man sich dann per getQueue() die entsprechende Queue holt. Danach kann die Queue mit offer() befüllt werden: hazelcastInstance.getQueue(„accountPostingsQueue“).offer(event);

HazelcastEventListener
@Service
public class HazelcastEventListener implements ItemListener {
 
    private static final Logger LOG = LoggerFactory.getLogger(HazelcastEventListener.class);
 
    private final IQueue iqueue;
 
    public HazelcastEventListener(final HazelcastInstance hazelcastInstance) {
        this.iqueue = hazelcastInstance.getQueue("accountPostingsQueue");
        iqueue.addItemListener(this, true);
    }
 
    @Override
    public void itemAdded(ItemEvent itemEvent) {
        try {
            final PostingEvent event = iqueue.take();
            
            // hier beginnt die Weiterverarbeitung des Events 
            // mit dem eigentlichen Crawlen des Accounts
            
        } catch (InterruptedException e) {
            LOG.error(e.getMessage(), e);
        } 
    }
 
    @Override
    public void itemRemoved(ItemEvent itemEvent) {
      LOG.info(itemEvent.toString());
    }
}

Die Klasse HazelcastEventListener ist das Gegenstück, das auf die Queue lauscht.

Dies geschieht durch die Implementierung des Interfaces ItemListener mit seinen beiden Lifecycle-Methoden itemdAdded() und itemRemoved().
Bei einem eingehenden PostingEvent wird die Methode itemAdded() getriggert. Darin wird dann aus der Queue das nächste PostingEvent genommen und zur eigentlichen Weiterverarbeitung zum Crawlen bearbeitet. Das Entnehmen des Events aus der Queue geschieht hier mit: final PostingEvent event = iqueue.take();

Sobald ein Event aus der Queue genommen wurde, wird noch die zweite EventListener-Methode itemRemoved() aufgerufen. Diese dient bei uns jedoch nur der Kontrolle, um die Events zu loggen.

Bewertung

Um eine Anwendung horizontal zu skalieren, gibt es natürlich unterschiedliche Ansätze und Lösungsmöglichkeiten. Oft wird z. B. die Skalierung über einen Loadbalancer geregelt, der dann die eingehenden Requests auf eine bestimmte Anzahl von Anwendungen verteilt. Dies hatte jedoch für uns den Nachteil, dass wir ein weiteres System hätten integrieren müssen, das dann auch noch administriert und betrieben werden muss.

Ein weiterer Vorteil des Skalierens mittels eines Messaging-Mechanismus ist auch, dass man nicht gezwungen ist, die komplette Anwendung zu parallelisieren, sondern ggf. auch nur den Teil skaliert werden kann, der performancekritisch ist. Genau das haben wir auch in unserem Projekt gemacht: Wir haben nur den Teil skaliert, in dem das zeitintensive Crawling durchgeführt wird.

Durch die sehr gute Spring-Integration von Hazelcast war es uns mit relativ wenig Code möglich, die horizontale Skalierung zu implementieren. Da wir uns dafür entschieden haben, Hazelcast im Embedded-Mode zu betreiben, war auch kein zusätzlicher Administrationsaufwand für ein externes Messaging-System nötig.

Weiterführende Informationen / Weblinks

Für weitere Information und Einsatzmöglichkeiten von Hazelcast lohnt es sich, die offizielle Seite zu besuchen: