PHP 通过 Thrift 操作 Hbase

Hbase1.0之后的版本中没有提供Thrift源码,需要到官网下载。地址

生成PHP文件:

thrift -gen php <PATH>/Hbase.thrift

启动Hbase和Hbase的Thrift守护程序

<PATH-TO-HBASE>/bin/start-hbase.sh
<PATH-TO-HBASE>/bin/hbase-daemon.sh start thrift

Thrift默认会监听9090端口,下面是一个常用操作的例子。命名为index.php,目录结构为:

gen-php/
lib/
index.php

Hbase里的数据结构如下,表名是blog,包含了articleauthor两个列族。

hbase(main):015:0> scan 'blog'
ROW          COLUMN+CELL
 1           column=article:content, timestamp=1434687037710, value=Hello,World
 1           column=article:tags, timestamp=1434533143824, value=Hadoop,HBase,NoSQL
 1           column=article:title, timestamp=1434533099302, value=Head First HBase
 1           column=author:name, timestamp=1434533489863, value=itsmikej1
 1           column=author:nickname, timestamp=1434687081713, value=\xE5\xB0\x8F\xE9\xBB\x91
1 row(s) in 0.0410 seconds

index.php

<?php
/**
 * Hbase操作
 *
 * @author itsmikej
 * @link https://mikej.me
 */

require_once __DIR__.'/lib/Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift', __DIR__ . '/lib');
# $loader->registerDefinition("tService", realpath(__DIR__ . "/gen-php"));
$loader->register();

use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\TBufferedTransport;

require_once __DIR__ . "/gen-php/Hbase/Hbase.php";
require_once __DIR__ . "/gen-php/Hbase/Types.php";


try {
    $socket = new TSocket('127.0.0.1', '9090');
    $socket->setSendTimeout(10000);
    $socket->setRecvTimeout(20000);
    $transport = new TBufferedTransport($socket);
    $protocol = new TBinaryProtocol($transport);
    $client = new Hbase\HbaseClient($protocol);
    $transport->open();

    $tables = $client->getTableNames();
    $table = current($tables);

    $mutations = array();
    $rowKey = '2';
    $row = array(
        'article' => array(
            'content' => 'Hello,Thrift',
            'tags' => 'Hbase',
            'title' => 'Thrift Test'
        ),
        'author' => array(
            'name' => 'tom',
            'nickname' => '小白',
            'sex' => 'man'  # 列可以动态扩展
        )
    );

    foreach ($row as $column_family => $data) {
        foreach ($data as $qualifier => $content) {
            $info = array(
                'column' => $column_family. ':' .$qualifier,
                'value' => $content
            );
            $mutations[] = new \Hbase\Mutation($info);
        }
    }

    # 添加数据
    $client->mutateRow($table, $rowKey, $mutations, array());


    $rows = array(
        '3' => $row,
        '4' => $row,
    );
    $row['author']['sex'] = 'woman';
    $rows['5'] = $row;

    # 批量添加
    $batchRecord = array();
    foreach ($rows as $row_key => $row) {
        $mutations = array();
        foreach ($row as $column_family => $data) {

            foreach ($data as $qualifier => $content) {
                $info = array(
                    'column' => $column_family . ':' . $qualifier,
                    'value' => $content
                );
                $mutations[] = new \Hbase\Mutation($info);
            }
        }
        $batchRecord[] = new \Hbase\BatchMutation(array(
            'row' => $row_key,
            'mutations' => $mutations
        ));

    }
    $client->mutateRows($table, $batchRecord, array());


    # 删除数据
    $client->deleteAllRow($table, $rowKey, array());


    # 查询数据
    $ret = $client->getRow($table, $rowKey, array());
    var_dump($ret, $ret[0]->columns['author:name']->value);


    # 扫描数据
    $scan = $client->scannerOpenWithStop($table, 1, 3, array(), array());
    $ret = $client->scannerGetList($scan, 2);
    var_dump($ret);


    # 过滤器
    $filter = array();
    $filter[] = "SingleColumnValueFilter('author', 'name', =, 'binary:tom')";
    $filter[] = "SingleColumnValueFilter('author', 'sex', =, 'binary:woman')";
    $filterString = implode(" AND ", $filter);
    $scanFilter = new \Hbase\TScan();
    $scanFilter->filterString = $filterString;
    $scanFilter->startRow = 1;
    $scanFilter->stopRow = 6;
    # $scanFilter->columns = array('column' => 'author'); # 指定返回列族

    $scan = $client->scannerOpenWithScan($table, $scanFilter, array());
    $ret = $client->scannerGetList($scan, 2);
    var_dump($ret);

    $transport->close();

} catch (Exception $e) {
    var_dump($e->getMessage());
}

参考

SingleColumnValueFilter过滤器在手册里的说明参数位置是错误的,正确的应该为:

Syntax: SingleColumnValueFilter(‘<family>’, ‘<qualifier>’, <compare operator>, ‘<comparator>’, ‘<filterIfColumnMissing_boolean>, <latest_version_boolean>’)
Syntax: SingleColumnValueFilter(‘<family>’, ‘<qualifier>, <compare operator>, ‘<comparator>’)
Example: “SingleColumnValueFilter (‘FamilyA’, ‘Column1’, <=, ‘abc’,‘true, false’)”
Example: “SingleColumnValueFilter (‘FamilyA’, ‘Column1’, <=, ‘abc’)”

标签: php, thrift, hbase

添加新评论