Autre partie consacrée à HBase: Cette fois ci, je vais appréhender l’utilisation des Filters.
Connexion à HBase en Python
Pour compliquer la chose, je vais illustrer mes exemples en utilisant Python, donc toujours via Thrift.
Pour la génération des classes, rien de bien nouveau: On utilise thrift avec le generator « py »:
$ /opt/thrift/bin/thrift -gen py /tmp/hbase-0.92.1/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
$ ls -l gen-py
total 4
-rw-rw-r-- 1 mycroft mycroft 0 Apr 9 22:33 __init__.py
drwxrwxr-x 2 mycroft mycroft 4096 Apr 9 22:36 hbase
Voici l’ébauche du code d’un mini client:
import thrift
import sys
sys.path.append('gen-py')
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
transport = TBufferedTransport(TSocket('localhost', 9090))
transport.open()
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
print(client.getTableNames())
La connexion à HBase en Python étant effectuée, repassons rapidement au shell HBase pour introduire les filters.
Les Filters
Ayant vu les fonctions Get et Scan, HBase permet grâce aux filtres la sélection d’enregistrements de façon plus fine.
On pourra filtrer les enregistrements basé par la présence d’une colonne, par une valeur, par son timestamp ou
encore de façon aléatoire.
Les Filters fonctionnent avec des opérateurs de comparaison (LESS ‘<', GREATER '>‘, EQUAL ‘=’, LESS_OR_EQUAL ‘<=' ..) et des comparateurs qui seront de valeurs de comparaisons (ex: comparateur de valeur, comparateur de prefix, comparateur de bit, de regex, de sous chaine...)
Utilisons un filtre basique: RowFilter. Il permet de filtrer les datas en se basant sur la clef des enregistrements.
RowFilter prend en argument un opérateur de comparaison et un comparateur.
Pour utliser un Filter dans le shell HBase, on utilise la syntaxe suivante:
hbase(main):001:0> scan "logs", { FILTER => "(RowFilter(=, 'binary:2'))" }
ROW COLUMN+CELL
2 column=commande:1000_demande, timestamp=1333402542205, value=\x00\x00\x00\x03
2 column=commande:1000_produit, timestamp=1333402542205, value=\x00\x00\x01\xA4
2 column=commande:1001_demande, timestamp=1333402542205, value=\x00\x00\x00\x12
...
Le Filter est ici définit sous sa forme textuelle et signifie: donne les row où la clef est égale à 2.
On pourrait aussi dire: donne ls row où la clef est avant 2. On écrirait alors: « (RowFilter(<, 'binary:2'))". Dans ce cas, on remarquera que toutes les clefs commencant par « 1″ seront renvoyées: En effet, HBase n’indexe pas les clefs par ordre numérique mais par ordre alphabétique !
De la même façon, les Filters FamilyFilter, QualifierFilter et ValueFilter permettront respectivement de filtrer par familles, qualifiers et finalement valeur.
Exemple: « (FamilyFilter(=, ‘binaryprefix:invent’)) » retournera toutes les colonnes/valeurs dont le nom de la famille commence par « invent ».
Les Filters peuvent faire l’objet de l’utilisation d’opérateurs binaires. On pourra additionner deux Filter grâce à l’opérateur « AND », par exemple. Illustation: « (QualifierFilter(=, ‘regexstring:.*produit’) AND ValueFilter(=, ‘binary:\x00\x00\x00\x11′) » retournera toutes les colonnes/valeurs dont le qualifier répond à l’expression réguliaire « .*produit » et dont la valeur est égale à \x11.
Il existe plusieurs autres Filters (voir les classes qui héritent de FilterBase, et il est possible de les illustrer en Python.
# que j'ai utilisé dans les posts précédents
def printRow(obj):
print "Row: %s" % (obj.row)
for col in obj.columns:
if col.endswith('_str'):
print "%s: %s" % (col, obj.columns[col].value)
else:
val = unpack("!I", obj.columns[col].value)[0]
print "%s: %d" % (col, val)
# Creation d'un objet TScan. Cet objet va contenir la filterString qui va contenir le Filter
scan = Hbase.TScan()
scan.filterString = "(SingleColumnValueFilter('magasin', 'id', =, 'binary:%s'))" % pack("!I", 6)
# Ouverture du scanner ...
scanner = client.scannerOpenWithScan('logs', scan)
# ... le reste est simple à comprendre:
r = client.scannerGet(scanner)
while r:
printRow(r[0])
r = client.scannerGet(scanner)
break
client.scannerClose(scanner)
Ce filtre est un peu différent des autres filtres vus jusqu’à maintenant. Il ne fait pas que filtrer, mais sélectionne l’enregistrement entier basé uniquement sur la comparaison d’une valeur donnée.
L’exemple du code ci-dessus, utilisant SingleColumnValueFilter, évoquait « si magasin:id = 6, alors renvoie l’enregistrement ».
De la même façon, SingleColumnValueExcludeFilter effectue l’opération strictement inverse: Les enregistrements qui matchent seront exclu.
Un autre filter permet de sortir uniquement un certain nombre de colonne par enregistrement. Il s’agit de ColumnCountGetFilter. Ex:
Row: 0
commande:1002_demande: 7
commande:1001_produit: 392
commande:1001_demande: 11
commande:1000_produit: 288
commande:1000_demande: 17
Scanner finished
De la même façon FirstKeyOnlyFilter permet d’accéder à la première colonne d’une key/value.
Filters custom grâce à l’API native
Dans HBase 0.93, un RandomRowFilter existe. Malheureusement, dans la version 0.92.1 que j’utilise maintenant, il est présent mais sous la forme textuelle, il n’est pas supporté. Je me suis mis alors à chercher à développer un nouveau Filter par moi même. Donc, grâce à l’API native, il est possible de faire évoluer l’API et de créer son propre filtre !
Un Filter doit implémenter l’interface suivante:
public enum ReturnCode {
INCLUDE, SKIP, NEXT_COL, NEXT_ROW, SEEK_NEXT_USING_HINT
}
public void reset()
public boolean filterRowKey(byte[] buffer, int offset, int length)
public boolean filterAllRemaining()
public ReturnCode filterKeyValue(KeyValue v)
public void filterRow(List<KeyValue> kvs)
public boolean hasFilterRow()
public boolean filterRow()
public KeyValue getNextKeyHint(KeyValue currentKV)
}
Mon petit filtre qui va afficher au hasard des colonnes:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.hbase.KeyValue;
public class MyOwnRandomFilter extends FilterBase
{
protected static final Random random = new Random();
protected float chance;
public MyOwnRandomFilter()
{
super();
}
public MyOwnRandomFilter(float chance)
{
this.chance = chance;
}
@Override
public ReturnCode filterKeyValue(KeyValue kv)
{
float score = random.nextFloat();
if(score > chance)
{
return ReturnCode.NEXT_COL;
}
return ReturnCode.INCLUDE;
}
@Override
public void write(DataOutput dataOutput) throws IOException
{
dataOutput.writeFloat(chance);
}
@Override
public void readFields(DataInput dataInput) throws IOException
{
chance = dataInput.readFloat();
}
}
Pour builder:
HBASE_PATH=/tmp/hbase-0.92.1
CLASS_PATH=${HBASE_PATH}/hbase-0.92.1.jar:${HBASE_PATH}/lib/hadoop-core-1.0.0.jar
javac -cp $CLASS_PATH org/apache/hadoop/hbase/filter/MyOwnRandomFilter.java
jar cvf MyOwnRandomFilter.jar org/apache/hadoop/hbase/filter/MyOwnRandomFilter.class
Il faut inclure le .jar dans hbase. Pour cela, modifier conf/hbase-env.sh et compléter la variable HBASE_CLASSPATH:
export HBASE_CLASSPATH=/home/mycroft/dev/hbase_filter/MyOwnRandomFilter.jar
Y a plus qu’à tester dans le shell hbase:
hbase(main):021:0> scan "logs", { LIMIT => 1 , FILTER => org.apache.hadoop.hbase.filter.MyOwnRandomFilter.new(0.01) }
ROW COLUMN+CELL
0 column=commande:1004_produit, timestamp=1333402542197, value=\x00\x00\x00\x98
0 column=commande:1010_produit, timestamp=1333402542197, value=\x00\x00\x00}
1 row(s) in 0.0240 seconds
Malheureusement, je n’ai pas réussi à trouver le moyen de passer ces filtres dans une filterString de TScan, et l’utiliser via Thrift. Il est cependant utilisable dans un client natif.