Autre partie consacrée à HBase: Cette fois ci, je vais appréhender l’utilisation des Filters.

Connexion à HBase en Python

Pour compliquer la chose, je vais illustrer mes exemples en utilisant Python, donc toujours via Thrift.

Pour la génération des classes, rien de bien nouveau: On utilise thrift avec le generator « py »:

$ mkdir /tmp/pyproj ; cd /tmp/pyproj
$ /opt/thrift/bin/thrift -gen py /tmp/hbase-0.92.1/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
$ ls -l gen-py
total 4
-rw-rw-r-- 1 mycroft mycroft    0 Apr  9 22:33 __init__.py
drwxrwxr-x 2 mycroft mycroft 4096 Apr  9 22:36 hbase

Voici l’ébauche du code d’un mini client:

#!/usr/bin/env python
import thrift
import sys

sys.path.append('gen-py')

from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase

transport = TBufferedTransport(TSocket('localhost', 9090))
transport.open()
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
print(client.getTableNames())

La connexion à HBase en Python étant effectuée, repassons rapidement au shell HBase pour introduire les filters.

Les Filters

Ayant vu les fonctions Get et Scan, HBase permet grâce aux filtres la sélection d’enregistrements de façon plus fine.
On pourra filtrer les enregistrements basé par la présence d’une colonne, par une valeur, par son timestamp ou
encore de façon aléatoire.

Les Filters fonctionnent avec des opérateurs de comparaison (LESS ‘<', GREATER '>‘, EQUAL ‘=’, LESS_OR_EQUAL ‘<=' ..) et des comparateurs qui seront de valeurs de comparaisons (ex: comparateur de valeur, comparateur de prefix, comparateur de bit, de regex, de sous chaine...)

Utilisons un filtre basique: RowFilter. Il permet de filtrer les datas en se basant sur la clef des enregistrements.

RowFilter prend en argument un opérateur de comparaison et un comparateur.

Pour utliser un Filter dans le shell HBase, on utilise la syntaxe suivante:

hbase(main):001:0> scan "logs", { FILTER => "(RowFilter(=, 'binary:2'))" }
ROW        COLUMN+CELL
 2         column=commande:1000_demande, timestamp=1333402542205, value=\x00\x00\x00\x03
 2         column=commande:1000_produit, timestamp=1333402542205, value=\x00\x00\x01\xA4
 2         column=commande:1001_demande, timestamp=1333402542205, value=\x00\x00\x00\x12
...

Le Filter est ici définit sous sa forme textuelle et signifie: donne les row où la clef est égale à 2.

On pourrait aussi dire: donne ls row où la clef est avant 2. On écrirait alors: « (RowFilter(<, 'binary:2'))". Dans ce cas, on remarquera que toutes les clefs commencant par « 1″ seront renvoyées: En effet, HBase n’indexe pas les clefs par ordre numérique mais par ordre alphabétique !

De la même façon, les Filters FamilyFilter, QualifierFilter et ValueFilter permettront respectivement de filtrer par familles, qualifiers et finalement valeur.

Exemple: « (FamilyFilter(=, ‘binaryprefix:invent’)) » retournera toutes les colonnes/valeurs dont le nom de la famille commence par « invent ».

Les Filters peuvent faire l’objet de l’utilisation d’opérateurs binaires. On pourra additionner deux Filter grâce à l’opérateur « AND », par exemple. Illustation: « (QualifierFilter(=, ‘regexstring:.*produit’) AND ValueFilter(=, ‘binary:\x00\x00\x00\x11′) » retournera toutes les colonnes/valeurs dont le qualifier répond à l’expression réguliaire « .*produit » et dont la valeur est égale à \x11.

Il existe plusieurs autres Filters (voir les classes qui héritent de FilterBase, et il est possible de les illustrer en Python.

# Fonction printRow qui marche sur le même principe que la fonction printRecord
# que j'ai utilisé dans les posts précédents
def printRow(obj):
  print "Row: %s" % (obj.row)
  for col in obj.columns:
    if col.endswith('_str'):
      print "%s: %s" % (col, obj.columns[col].value)
    else:
      val = unpack("!I", obj.columns[col].value)[0]
      print "%s: %d" % (col, val)

# Creation d'un objet TScan. Cet objet va contenir la filterString qui va contenir le Filter
scan = Hbase.TScan()
scan.filterString = "(SingleColumnValueFilter('magasin', 'id', =, 'binary:%s'))" % pack("!I", 6)

# Ouverture du scanner ...
scanner = client.scannerOpenWithScan('logs', scan)

# ... le reste est simple à comprendre:
r = client.scannerGet(scanner)
while r:
  printRow(r[0])
  r = client.scannerGet(scanner)
  break

client.scannerClose(scanner)

Les Filters dédiés

Ce filtre est un peu différent des autres filtres vus jusqu’à maintenant. Il ne fait pas que filtrer, mais sélectionne l’enregistrement entier basé uniquement sur la comparaison d’une valeur donnée.

L’exemple du code ci-dessus, utilisant SingleColumnValueFilter, évoquait « si magasin:id = 6, alors renvoie l’enregistrement ».

De la même façon, SingleColumnValueExcludeFilter effectue l’opération strictement inverse: Les enregistrements qui matchent seront exclu.

Un autre filter permet de sortir uniquement un certain nombre de colonne par enregistrement. Il s’agit de ColumnCountGetFilter. Ex:

$ ./test.py
Row: 0
commande:1002_demande: 7
commande:1001_produit: 392
commande:1001_demande: 11
commande:1000_produit: 288
commande:1000_demande: 17
Scanner finished

De la même façon FirstKeyOnlyFilter permet d’accéder à la première colonne d’une key/value.

Filters custom grâce à l’API native

Dans HBase 0.93, un RandomRowFilter existe. Malheureusement, dans la version 0.92.1 que j’utilise maintenant, il est présent mais sous la forme textuelle, il n’est pas supporté. Je me suis mis alors à chercher à développer un nouveau Filter par moi même. Donc, grâce à l’API native, il est possible de faire évoluer l’API et de créer son propre filtre !

Un Filter doit implémenter l’interface suivante:

public interface Filter extends Writable {
    public enum ReturnCode {
        INCLUDE, SKIP, NEXT_COL, NEXT_ROW, SEEK_NEXT_USING_HINT
    }

    public void reset()
    public boolean filterRowKey(byte[] buffer, int offset, int length)
    public boolean filterAllRemaining()
    public ReturnCode filterKeyValue(KeyValue v)
    public void filterRow(List<KeyValue> kvs)
    public boolean hasFilterRow()
    public boolean filterRow()
    public KeyValue getNextKeyHint(KeyValue currentKV)
}

Mon petit filtre qui va afficher au hasard des colonnes:

package org.apache.hadoop.hbase.filter;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.hbase.KeyValue;

public class MyOwnRandomFilter extends FilterBase
{
    protected static final Random random = new Random();

    protected float chance;

    public MyOwnRandomFilter()
    {
        super();
    }

    public MyOwnRandomFilter(float chance)
    {
        this.chance = chance;
    }

    @Override
    public ReturnCode filterKeyValue(KeyValue kv)
    {
        float score = random.nextFloat();

        if(score > chance)
        {
            return ReturnCode.NEXT_COL;
        }

        return ReturnCode.INCLUDE;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException
    {
        dataOutput.writeFloat(chance);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException
    {
        chance = dataInput.readFloat();
    }
}

Pour builder:

#!/bin/bash

HBASE_PATH=/tmp/hbase-0.92.1
CLASS_PATH=${HBASE_PATH}/hbase-0.92.1.jar:${HBASE_PATH}/lib/hadoop-core-1.0.0.jar

javac -cp $CLASS_PATH org/apache/hadoop/hbase/filter/MyOwnRandomFilter.java
jar cvf MyOwnRandomFilter.jar org/apache/hadoop/hbase/filter/MyOwnRandomFilter.class

Il faut inclure le .jar dans hbase. Pour cela, modifier conf/hbase-env.sh et compléter la variable HBASE_CLASSPATH:
export HBASE_CLASSPATH=/home/mycroft/dev/hbase_filter/MyOwnRandomFilter.jar

Y a plus qu’à tester dans le shell hbase:

hbase(main):021:0> scan "logs", { LIMIT => 1 , FILTER => org.apache.hadoop.hbase.filter.MyOwnRandomFilter.new(0.01) }
ROW                COLUMN+CELL
 0                 column=commande:1004_produit, timestamp=1333402542197, value=\x00\x00\x00\x98
 0                 column=commande:1010_produit, timestamp=1333402542197, value=\x00\x00\x00}
1 row(s) in 0.0240 seconds

Malheureusement, je n’ai pas réussi à trouver le moyen de passer ces filtres dans une filterString de TScan, et l’utiliser via Thrift. Il est cependant utilisable dans un client natif.

Récemment, dans le cadre d’un stage que nous encadrons au taf, on est en train de découvrir certains nouveaux tools pour les tests unitaires et surtout fonctionnels (Il faut dire que Webtest de Canoo commence à se faire vieux, et nous pose plein de problème quand il s’agit de faire des tests AJAX).

On a donc décidé de prendre le taureau par les cornes et de retenter le coup avec Selenium, et de rénover notre stack de test, en encadrant le tout avec Behat, framework PHP pour faciliter le Behaviour Driven Development.

On aura donc besoin des composants suivants:

* Behat, comme le framework BDD;
* Mink, pour les tests fonctionnels;
* Selenium 2.

$ pear channel-discover pear.symfony.com
$ pear channel-discover pear.behat.org
$ pear install behat/behat
$ pear install behat/mink
$ wget http://selenium.googlecode.com/files/selenium-server-standalone-2.21.0.jar

A noter que je vais utiliser le driver Selenium2 (ou webdriver) et non le driver Selenium utilisé par la plupart des exemples que j’ai pu trouver jusqu’à maintenant sur le web.

Ah, et le script que je vais tester est:

<?php
if("POST" === $_SERVER['REQUEST_METHOD'])
{
   $param = $_POST['clear_text'];

   echo sha1($param);
   die();
}
?>
<html>
<head>
  <script src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.2/jquery.min.js"></script>
  <script>
$(document).ready(function()
{
    $('#text').keyup(function(event) {
        // Use ajax to call script, using POST.
        $.ajax({
            type: "POST",
            url: "index.php",
            data: { clear_text: $('#text').val() }
        }).done(function(data) {
            $('#output').text(data);
        });
    });
});
  </script>
</head>
<body>
  <form>
    Input: <input type="text" id="text" /><br>
  </form>
  Output: <span id="output"></span>
</body>
</html>

(Pour les non initiés, il s’agit juste d’une balise input et d’un champs texte mis à jour suite à un appel ajax lancé suite à la modification de l’input. Le texte affiché est le résultat d’un sha1sum.)

Créons le dépôt pour les tests:

$ mkdir testing && cd testing
$ behat --init
+d features - place your *.feature files here
+d features/bootstrap - place bootstrap scripts and static files here
+f features/bootstrap/FeatureContext.php - place your feature related code here

Et après la lecture de la doc de behat (et en particulier Developing Web Applications with Behat and Mink), rédigons la feature:

# features/sha1.feature
Feature: Sha1
  In order to compute a sha1sum
  As a website user
  I need to be able to compte a sha1sum using an asynchronous script

  Scenario: Compute sha1sum
    Given I am on "/"
    When I fill in "text" with "my text"
    Then I should see "c3599c11e47719df18a2448690840c5dfcce3c80"

Et avant de tester la feature avec Behat, quelques petites modifications s’imposent dans features/bootstrap/FeatureContext.php. La classe contenue doit hériter de Behat\Mink\Behat\Context\MinkContext et le script doit require mink/autoload.php:

<?php

use Behat\Behat\Context\ClosuredContextInterface,
    Behat\Behat\Context\TranslatedContextInterface,
    Behat\Behat\Context\BehatContext,
    Behat\Behat\Exception\PendingException;
use Behat\Gherkin\Node\PyStringNode,
    Behat\Gherkin\Node\TableNode;

require_once 'mink/autoload.php';

class FeatureContext extends Behat\Mink\Behat\Context\MinkContext
{
};

?>

… et je crée un fichier de conf de base pour behat justement nommé config/behat.yml:

default:
  paths:
    features:       features
    bootstrap:      features/bootstrap
  context:
    parameters:
      base_url: http://localhost/~mycroft/testing/

Y a pu qu’à tester:

$ behat -c config/behat.yml sha1.feature
Feature: Sha1
  In order to compute a sha1sum
  As a website user
  I need to be able to compte a sha1sum using an asynchronous script

  Scenario: Compute sha1sum                                      # sha1.feature:6
    Given I am on "/"                                            # FeatureContext::visit()
    When I fill in "text" with "my text"                         # FeatureContext::fillField()
      Form submit button for field with xpath "(//html/.//*[self::input | self::textarea | self::select][not(./@type = 'submit' or ./@type = 'image' or ./@type = 'hidden')][(((./@id = 'text' or ./@name = 'text') or ./@id = //label[contains(normalize-space(string(.)), 'text')]/@for) or ./@placeholder = 'text')] | .//label[contains(normalize-space(string(.)), 'text')]//.//*[self::input | self::textarea | self::select][not(./@type = 'submit' or ./@type = 'image' or ./@type = 'hidden')])[1]"not found
    Then I should see "c3599c11e47719df18a2448690840c5dfcce3c80" # FeatureContext::assertPageContainsText()

1 scenario (1 failed)
3 steps (1 passed, 1 skipped, 1 failed)
0m0.019s

Bon, certes, il a lancé quelque chose, mais il l’a fait sans faire fonctionner Selenium. En fait par défaut, il va utiliser Goutte, qui est un pseudo navigateur en pur PHP sans client frontend, et donc pauvre en features pour les applications riches.

Configurons maintenant Behat pour fonctionner avec Selenium. Le fichier config/behat.yml devient:

default:
  paths:
    features:       features
    bootstrap:      features/bootstrap
  context:
    parameters:
      base_url: http://localhost/~mycroft/js/
      javascript_session: webdriver
      webdriver:
        host: 'http://localhost:4444/wd/hub'
        browser: firefox

Et n’oublions pas de spécifier avant le Scenario d’utiliser la session selenium, en imposant le tag @Javascript:


@javascript
Scenario: Compute sha1sum

Lancer Selenium va être nécessaire pour continuer:

$ java -jar selenium-server-standalone-2.21.0.jar
Apr 24, 2012 11:31:31 PM org.openqa.grid.selenium.GridLauncher main
INFO: Launching a standalone server
[...]

En lancant behat, cette fois-ci, on aura le navigateur qui se lancera et le test échouera:

Feature: Sha1
  In order to compute a sha1sum
  As a website user
  I need to be able to compte a sha1sum using an asynchronous script

  @javascript
  Scenario: Compute sha1sum                                      # sha1.feature:7
    Given I am on "/"                                            # FeatureContext::visit()
    When I fill in "text" with "my text"                         # FeatureContext::fillField()
    Then I should see "c3599c11e47719df18a2448690840c5dfcce3c80" # FeatureContext::assertPageContainsText()
      The text "c3599c11e47719df18a2448690840c5dfcce3c80" was not found anywhere in the text of the current page

1 scenario (1 failed)
3 steps (2 passed, 1 failed)
0m2.343s

Ca ne fonctionne pas parce que l’action « I fill in » remplit le champs et que donc l’évènement keyup ne fonctionnera jamais. Je modifie donc le scénario pour ajouter une nouvelle action custom:

...
    When I press letter after letter "my text" in "text"
...

… et je modifie la classe FeatureContext en y ajoutant le bon handler (à noter qu’on peut une fois après avoir fait hériter FeatureContext par Behat\Mink\Behat\Context\MinkContext lancer la commande behat -dl pour lister les handlers possibles):

    /**
     * @When /^I press letter after letter "([^"]*)" in "([^"]*)"$/
     */

    public function iPressLetterAfterLetterIn($text, $where)
    {
        $session = $this->getMink()->getSession();

        $xpath_elem = sprintf("//input[@id='%s']", $where);

        $session->getDriver()->focus($xpath_elem);

        for($i = 0; $i < strlen($text); $i ++)
        {
            $session->getDriver()->keypress($xpath_elem, $text[$i]);
            // Le keypress ne suffit pas pour déclencher l'évènement Javascript,
            // d'où la présence du keyup.
            $session->getDriver()->keyup($xpath_elem, $text[$i]);
            $session->wait(200);
        }
    }

… et je relance le test:

  @javascript
  Scenario: Compute sha1sum                                      # sha1.feature:7
    Given I am on "/"                                            # FeatureContext::visit()
    When I press letter after letter "my text" in "text"         # FeatureContext::iPressLetterAfterLetterIn()
    Then I should see "c3599c11e47719df18a2448690840c5dfcce3c80" # FeatureContext::assertPageContainsText()

1 scenario (1 passed)
3 steps (3 passed)
0m3.71s

Et mon test est validé !

Complétons mon code html et javascript pour y ajouter un bouton et une action:

...
    $('#clear').click(function() {
        $('#text').val('');
        $('#output').text('cleared!');
    });
...
    <input type="button" id="clear" value="clear" /><br>
...

Et mon scénario:

    When I press "clear"
    Then I should not see "c3599c11e47719df18a2448690840c5dfcce3c80"
    Then I should see "cleared!"

Et là, je relance behat:

  @javascript
  Scenario: Compute sha1sum                                          # sha1.feature:7
    Given I am on "/"                                                # FeatureContext::visit()
    When I press letter after letter "my text" in "text"             # FeatureContext::iPressLetterAfterLetterIn()
    Then I should see "c3599c11e47719df18a2448690840c5dfcce3c80"     # FeatureContext::assertPageContainsText()
    When I press "clear"                                             # FeatureContext::pressButton()
    Then I should not see "c3599c11e47719df18a2448690840c5dfcce3c80" # FeatureContext::assertPageNotContainsText()
    Then I should see "cleared!"                                     # FeatureContext::assertPageContainsText()

1 scenario (1 passed)
6 steps (6 passed)
0m3.767s

Et vous pourrez constater dans le navigateur qu’ouvre Selenium que tout va bien ! :-)

Apache Thrift est un framework qui propose un moteur de génération de code afin de mettre à disposition un service pouvant faire fonctionner ensemble des composantes logiciels de langages différents, comme le C++, Java, Python, PHP, …

Il prend en entrée un fichier de définition de protocole, un .thrift, dans lequel sont définis les types de données et les interfaces, et génère le code pour construire facilement des clients et serveurs RPC et qui pourront communiquer indépendamment de leur langage d’origine.

Comme le projet HBase propose ce fichier de définition et le daemon adéquat, il est possible de discuter en PHP ou en Python avec HBase en quelques minutes!

Installation de Thrift

Rien de bien compliqué. Il faut juste s’assurer que les packages -dev des languages désirés soient installés préalablement sur la machine. Comme je vais utiliser l’interface PHP, je m’assure que php-devel soit installé sur ma machine Fedora (ou php5-dev sous Debian/Ubuntu):

$ curl -k -C - -O "http://mirrors.ircam.fr/pub/apache/thrift/0.8.0/thrift-0.8.0.tar.gz"
[...]

$ tar xvfz thrift-0.8.0.tar.gz
[...]

$ cd thrift-0.8.0
$ ./configure --prefix=/opt/thrift --with-php --with-python --with-java
[...]

config.status: config.h is unchanged

thrift 0.8.0

Building code generators ..... :

Building C++ Library ......... : no
Building C (GLib) Library .... : no
Building Java Library ........ : yes
Building C# Library .......... : no
Building Python Library ...... : yes
Building Ruby Library ........ : no
Building Haskell Library ..... : no
Building Perl Library ........ : no
Building PHP Library ......... : yes
Building Erlang Library ...... : no
Building Go Library .......... : no

$ make
[...]

$ sudo make install
[...]

Generation des classes Thrift pour HBase

Le fichier protocole Hbase pour Thrift est se trouve dans src/main/resources/org/apache/hadoop/hbase/thrift/. Pour générer les classes en rapport, rien de plus simple:

$ /opt/thrift/bin/thrift --gen php /tmp/hbase-0.92.1/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift

$ ls -l gen-php/Hbase/
total 324
-rw-r--r-- 1 root root 292201 Apr  4 15:43 Hbase.php
-rw-r--r-- 1 root root  33001 Apr  4 15:43 Hbase_types.php

J’ai l’habitude, peut-être mauvaise, de prendre le code php de Thrift et de l’isoler. Mes futurs exemples nécessites donc de créer un lib/vendors/thrift avec tout le monde nécessaire:

$ mkdir -p thrift
$ cp -Rp /tmp/thrift-0.8.0/lib/php/src/* thrift/
$ mkdir -p thrift/packages/Hbase
$ cp gen-php/Hbase/* thrift/packages/Hbase/
$ rm -fr gen-php
$

Ne reste plus qu’à tester l’installation. Un petit script PHP pour ça…

<?php

$GLOBALS['THRIFT_ROOT'] = 'thrift/';

// Tout les require nécessaires ...
require_once($GLOBALS['THRIFT_ROOT'].'/Thrift.php' );
require_once($GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php' );
require_once($GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php' );
require_once($GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php' );

require_once($GLOBALS['THRIFT_ROOT'].'/packages/Hbase/Hbase.php' );

// Connexion à Thrift
$socket = new TSocket('localhost', 9090);
$socket->setSendTimeout( 10000 );
$socket->setRecvTimeout( 20000 );
$transport = new TBufferedTransport( $socket );
$protocol = new TBinaryProtocol( $transport );
$client = new HbaseClient( $protocol );

$transport->open();

// Appel de la fonction getTableNames (ie: commande 'list' dans HBase)
$tables = $client->getTableNames();
var_dump($tables);

// Fermeture de tout.
unset($client);
unset($protocol);
unset($transport);
unset($socket);

… que l’on va executer:

$ php test.php  
PHP Fatal error:  Uncaught exception 'TException' with message 'TSocket: Could not connect to localhost:9090 (Connection refused [111])' in /tmp/test_thrift/sample/lib/vendors/thrift/transport/TSocket.php:229

Ho, il semblerait que j’ai oublié la dernier étape, l’une des plus importantes: Le lancement de l’interface Thrift d’Hbase.

$ cd /tmp/hbase-0.92.1/
$ ./bin/hbase-daemon.sh start thrift
starting thrift, logging to /tmp/hbase-0.92.1/logs/hbase-root-thrift-lambda.out
$

et on re-teste notre script PHP:

$ php test.php
array(2) {
  [0]=>
  string(5) "achat"
  [1]=>
  string(4) "logs"
}

En listant les deux tables « achat » et « logs », on conclut qu’on a réussi à causer avec HBase en PHP !

En jettant un coup d’oeil dans HBase.php généré par Thrift, on peut voir la liste de fonctions de l’API HBase utilisables à travers thrift:

...
  public function enableTable($tableName);
  public function disableTable($tableName);
  public function isTableEnabled($tableName);
...
  public function getTableNames();
  public function getColumnDescriptors($tableName);
...
  public function getRow($tableName, $row);
  public function getRowWithColumns($tableName, $row, $columns);
...
  public function mutateRow($tableName, $row, $mutations);
...
  public function deleteAll($tableName, $row, $column);
  public function deleteAllRow($tableName, $row);
...
  public function scannerOpen($tableName, $startRow, $columns);
...
  public function scannerGet($id);
  public function scannerGetList($id, $nbRows);
  public function scannerClose($id);
...
</php>

Comme qui dirait, y a pu qu'à coder!

Récupération des columns d'
une table:

<code lang="php">$table_name = 'logs';

$columns_names = array();
$columns = $client->getColumnDescriptors($table_name);
foreach($columns as $name => $column)
{
    $columns_names[] = $name;
}

Ouverture d’un scanner et lecture des données avec scanner{Open,Get,Close}

Un scanner permet de récupérer les éléments d’une table un par un, en globalité ou avec des critères de filtrages.

Un scanner fonctionne ainsi: On l’ouvre avec scannerOpen (ou toute autre fonction scannerOpenXXX) en lui disant ce que l’on veut, puis on récupère un ou plusieurs éléments avec scannerGet ou scannerGetList.

En final, on peut fermer le scanner grâce à scannerClose.

$scanner = $client->scannerOpen($table_name, '', $columns_names);
do
{
    $obj = $client->scannerGet($scanner);
    // Un scannerGet renvoie toujours un objet tandis que scannerGetList peut
    // en renvoyer plusieurs. Dans les deux cas, il renvoie un tableau de Row,
    // d'où le [0] ici:

    $row = $obj[0];
    printRecord($row);
}
while(count($obj));

Ma fonction printRecord:

Ma fonction prend soin de bien afficher les valeurs, en utilisant la convention suivante: Si le nom de la colonne est suffixé par « _str », alors on traite une chaine de caractère. Sinon, c’est un entier, et dans ce cas on a le besoin d’utiliser la fonction unpack pour reformater l’entier.

function printRecord($record)
{
    echo "Row: " . $record->row . "\n";
    foreach($record->columns as $id => $cell)
    {
        $string_suffix = '_str';
        if($string_suffix === substr($id, strlen($id) - strlen($string_suffix), strlen($string_suffix)))
            echo $id . ": " . $cell->value . "\n";
        else
        {
            $val = unpack("Nint", $cell->value);
            echo $id . ": " . $val['int'] . "\n";
        }
    }
}

Sortie du scanner à l’exécution:

Row: 0
commande:1000_demande: 17
commande:1000_produit: 288
commande:1001_demande: 11
commande:1001_produit: 392
...
commande:1039_demande: 6
commande:1039_produit: 419
inventaire:1000_produit: 355
inventaire:1000_restant: 2
inventaire:1000_vendu: 7
inventaire:1001_produit: 66
inventaire:1001_restant: 5
inventaire:1001_vendu: 17
...
inventaire:vendu: 7
magasin:employee: 10
magasin:id: 11

Il faut comprendre une petite chose. Dans les posts précédents, les données stockées étaient des entiers. Hors, HBase ne fait pas la différence entre entiers, strings et les autres types car il stocke tout sous forme de byte array. Donc, pendant la lecture des données, il est nécessaire de transformer ces byte array sous forme d’integer, d’où l’utilisation d’unpack. De plus, je vais par la suite stocker des chaines de caractères, d’où l’utilisation de champs au nom suffixé par un « _str ».

Modification des enregistrements avec mutateRow:

// "mutateRow" permet de modifier un enregistrement à partir de son Id.
// Une mutation est une modification de colonne/valeur. Un appel à mutateRow peut modifier
// plusieurs valeurs car il prend un array d'objets Mutations:

// Row key:
$row_key = "my_new_row_120";
$mutations = array();
// Stockage d'une string
$mutations[] = new Mutation(array('column' => 'magasin:owner_str', 'value' => 'Patrick MARIE'));
// Stockage d'un entier
$mutations[] = new Mutation(array('column' => 'magasin:id', 'value' => pack("N", 117)));

$client->mutateRow($table_name, $row_key, $mutations);

// On va verifier que le changement est bien appliqué:

$obj = $client->getRow($table_name, $row_key);
printRecord($obj[0]);

/* output:
Row: my_new_row_120
magasin:id: 117
magasin:owner_str: Patrick MARIE
*/

Finalement, on pourrait effacer une colonne/valeur ou tout un enregistrement avec deleteAll ou deleteAllRow:

$client->deleteAll($table_name, $row_key, 'magasin:owner_str');
$obj = $client->getRow($table_name, $row_key);
printRecord($obj[0]);

/* output:
Row: my_new_row_120
magasin:id: 117
*/

Plus d’informations:

L’API HBase permet de développer, en Java, n’importe quel type d’application en utilisant HBase comme système de stockage.

Cette note va illustrer rapidement l’utilisation d’un HTable, des interfaces Get/Put, mais aussi Scan, son exploitation et l’extraction des résultats grace aux classes Result et KeyValue.

Comme à mon habitude, je préfère ne pas trop m’attarder sur les mots et donner des exemples concrets prèts à utilisation.

Pour illustrer cette utilisation de l’API, j’ai imaginé le scénario suivant: On va stocker le log d’une mise à jour de stocks et de commandes d’un réseau de magasins.

Imaginons donc une base centrale, censée recevoir des ordres de dizaines de magasins répartis à plein d’endroits dans le monde. Cette base centrale reçoit des ordres d’inventaires et de commandes par paquets et sont stockés tel quel. Nous voulons stocker ces ordres sans les découper spécifiquement pour un SGBD relationnel.

Le schéma est le suivant:

key:
  magasin => informations relatives au magasin envoyant l'ordre;
  inventaire => produits restants, dont le qualifieur (produit, restant ou vendu) est préfixé par un entier;
  commande => produits commandés, dont le qualifieur (produit, commande) est préfixé par un entier.

1234_1_2:
  magasin:id => 17
  magasin:employe => 14
  inventaire:date => '2011-12-01'
  inventaire:0001_produit => 12
  inventaire:0001_restant => 3
  inventaire:0001_vendu => 4
  inventaire:0002_produit => 14
  inventaire:0002_restant => 4
  commande:0001_produit => 12
  commande:0001_demande => 20

La table HBase créée sera donc:

hbase(main):001:0> create 'logs', { NAME => 'magasin' }, { NAME => 'inventaire' }, { NAME => 'commande' }
0 row(s) in 1.4900 seconds

1er exemple: Création d’un record dans une table.

Utilisation de HTable, représentation d’une table, de Put, de Get et finalement de Result.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Result;

import java.io.IOException;

public class testPut {
    public static void main(String[] args) throws IOException {

        // Etape obligatoire, on charge la configuration HBase.
        Configuration conf = HBaseConfiguration.create();

        // Ouverture de la table. Son nom est "logs", comme on l'a créé plus tôt.
        HTable table = new HTable(conf, "logs");

        // Creation de l'objet Put, et remplissage.
        Put obj = new Put(Bytes.toBytes("1")); // key
        obj.add(Bytes.toBytes("magasin"),      // family
                Bytes.toBytes("id"),           // qualifier
                Bytes.toBytes("17"));          // value

        // Stockage de l'objet dans la table
        table.put(obj);


        // On prépare la "requête" pour récupérer la donnée dans un objet Get:
        Get retr = new Get(Bytes.toBytes("1"));

        // On lance celle ci,
        Result rec = table.get(retr);

        // On récupère la value dans la réponse Result:
        String value = new String(rec.getValue(Bytes.toBytes("magasin"),
                                               Bytes.toBytes("id")));

        System.out.println("Value for this row/column is " + value);
    }
}

Pour compiler et tester ça, j’ai un petit script tout moche:

$ cat build.sh
#!/bin/sh

HBASE_DIR=/tmp/hbase-0.92.1
CLASSPATH=$HBASE_DIR/hbase-0.92.1.jar
CLASSPATH=$CLASSPATH:$HBASE_DIR/lib/hadoop-core-1.0.0.jar
CLASSPATH=$CLASSPATH:$HBASE_DIR/lib/commons-logging-1.1.1.jar
CLASSPATH=$CLASSPATH:$HBASE_DIR/lib/log4j-1.2.16.jar
CLASSPATH=$CLASSPATH:$HBASE_DIR/lib/slf4j-api-1.5.8.jar
CLASSPATH=$CLASSPATH:$HBASE_DIR/lib/slf4j-log4j12-1.5.8.jar
CLASSPATH=$CLASSPATH:$HBASE_DIR/lib/commons-configuration-1.6.jar
CLASSPATH=$CLASSPATH:$HBASE_DIR/lib/commons-lang-2.5.jar
CLASSPATH=$CLASSPATH:$HBASE_DIR/lib/zookeeper-3.4.3.jar

echo CLASSPATH=$CLASSPATH

javac -cp $CLASSPATH testPut.java && \
java -cp $CLASSPATH:. testPut

$ sh build.sh
[...]
Value for this row/column is 17

On étend un peu le truc pour créer un générateur de données, ce qui va remplir notre base nous servir de base de travail. 500000 enregistrements, contenant entre 1 et 50 inventaires et 1 et 50 commandes, ça fait du environ 2.5 millions de valeurs utiles !

Le code:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Result;

import java.io.IOException;

import java.util.Random;

public class Generator {
    public void Generator()
    {
    }

    // Log generation.
    // A log is from a shop, identified by an id.
    // A log can have from 0 to 50 "inventaire" columns,
    // and from 0 to 50 "commande" columns.
    //
    // Everything is randomly generated.
    //
    public Put generateLog(int i)
    {
        Put newObj = new Put((new String("" + i)).getBytes());
        Random rand = new Random();

        // 30 shops
        newObj.add(Bytes.toBytes("magasin"),
                   Bytes.toBytes("id"),
                   Bytes.toBytes(rand.nextInt(30)));

        // There is between 0 and 30 employee working today
        newObj.add(Bytes.toBytes("magasin"),
                   Bytes.toBytes("employee"),
                   Bytes.toBytes(rand.nextInt(30)));

        int nb_inventaire = rand.nextInt(50);
        for(int j = 0; j < nb_inventaire; j ++)
        {
            int id = 1000 + j;

            newObj.add(Bytes.toBytes("inventaire"),
                       Bytes.toBytes(id + "_produit"),
                       Bytes.toBytes(rand.nextInt(500))); // 500 different products
            newObj.add(Bytes.toBytes("inventaire"),
                       Bytes.toBytes(id + "_restant"),
                       Bytes.toBytes(rand.nextInt(20)));
            newObj.add(Bytes.toBytes("inventaire"),
                       Bytes.toBytes(id + "_vendu"),
                       Bytes.toBytes(rand.nextInt(20)));
        }

        int nb_commande = rand.nextInt(50);
        for(int j = 0; j < nb_commande; j ++)
        {
            int id = 1000 + j;

            newObj.add(Bytes.toBytes("commande"),
                       Bytes.toBytes(id + "_produit"),
                       Bytes.toBytes(rand.nextInt(500))); // 500 different products
            newObj.add(Bytes.toBytes("commande"),
                       Bytes.toBytes(id + "_demande"),
                       Bytes.toBytes(rand.nextInt(20)));
        }

        return newObj;

    }

    public static void main(String[] args) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        HTable table = new HTable(conf, "logs");

        Generator generator = new Generator();

        for(int i = 0; i < 500000; i ++)
        {
            Put obj = generator.generateLog(i);

            table.put(obj);

            if(i % 10000 == 0)
            {
                System.out.print(".");
                System.out.flush();
            }
        }

        System.out.println("");
    }
}

On compile et on lance le tout:

$ time sh build.sh
[...]
.......................

real    6m39.892s
user    1m32.956s
sys     0m21.131s

$

6 minutes 40 secondes pour insérer 500000 records. C’est pas chouette mais c’est clairement pas optimisé non plus. Il y a des tricks pour faciliter l’import de datas plus rapidement. Je reviendrai dessus à l’occasion.

On peut vérifier dans le shell hbase le nombre d’enregistrements. ‘count‘ permet de scanner une table afin de connaître ce nombre:

hbase(main):015:0> count "logs"
Current count: 497000, row: 97298
Current count: 498000, row: 98198
Current count: 499000, row: 99098
Current count: 500000, row: 99999
500000 row(s) in 30.2430 seconds

hbase(main):016:0> get "logs", 1303
COLUMN                                CELL
 commande:1000_demande                timestamp=1333402545108, value=\x00\x00\x00\x0B
 commande:1000_produit                timestamp=1333402545108, value=\x00\x00\x01A
 commande:1001_demande                timestamp=1333402545108, value=\x00\x00\x00\x03
 commande:1001_produit                timestamp=1333402545108, value=\x00\x00\x01[
 commande:1002_demande                timestamp=1333402545108, value=\x00\x00\x00\x08
 commande:1002_produit                timestamp=1333402545108, value=\x00\x00\x00\xCD
 inventaire:1000_produit              timestamp=1333402545108, value=\x00\x00\x007
 inventaire:1000_restant              timestamp=1333402545108, value=\x00\x00\x00\x0C
 inventaire:1000_vendu                timestamp=1333402545108, value=\x00\x00\x00\x05
 ...
 magasin:employee                     timestamp=1333402545108, value=\x00\x00\x00\x0C
 magasin:id                           timestamp=1333402545108, value=\x00\x00\x00\x0E

Petite remarque: les valeurs entières ne sont pas stockés au format texte, d’où la sortie un peu perturbante.

Seconde étape: Export de ces données

Instructions: Scan, ResultScanner, Result et KeyValue:

Il est possible de parcourir une table entière, élément par élément grace à l’interface « Scan ». Associé à une instance HTable, il nous fournit un ResultScanner et ce dernier renvoie, à l’appel « next », un Result.

Scan permet aussi de filtrer les éléments traités, notamment à l’aide de Filters, que je traiterai plus tard.

Encore du code:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;

import java.io.IOException;

import java.util.ArrayList;

public class Explorator {
    public static void dumpResult(Result rec)
    {
        // Pour chaque valeur - identification par colonne:
        for(KeyValue kv : rec.list())
        {
            // Comme on n'a que des entiers...
            // Attention si l'on stocke des chaines de caracteres!
            int val = Bytes.toInt(kv.getValue());
            System.out.println(new String(kv.getFamily()) +
                               ":" + new String(kv.getQualifier()) +
                               " => " + val);
        }
    }

    public static void main(String[] args) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        HTable table = new HTable(conf, "logs");

        // Recuperation d'une donnee avec l'id 1303.
        Get retr = new Get(Bytes.toBytes("1303"));

        Result rec = table.get(retr);

        dumpResult(rec);

        // Initialisation d'un scan
        Scan scanner = new Scan();

        // On affecte le scan à la table. Cela renvoie une instance de
        // ResultScanner.
        ResultScanner rs = table.getScanner(scanner);

        for(int i = 0; i < 100; i ++)
        {
            // Parcours des éléments un à un avec l'instruction next.
            rec = rs.next();

            dumpResult(rec);
        }
    }
}

Quelques liens à consulter en rapport:

Quelques liens à lire !