2015年6月

PHP 通过 Thrift 操作 Hbase

Hbase1.0之后的版本中没有提供Thrift源码,需要到官网下载。地址

生成PHP文件:

thrift -gen php <PATH>/Hbase.thrift

启动Hbase和Hbase的Thrift守护程序

<PATH-TO-HBASE>/bin/start-hbase.sh
<PATH-TO-HBASE>/bin/hbase-daemon.sh start thrift

Thrift默认会监听9090端口,下面是一个常用操作的例子。命名为index.php,目录结构为:

gen-php/
lib/
index.php

Hbase里的数据结构如下,表名是blog,包含了articleauthor两个列族。

hbase(main):015:0> scan 'blog'
ROW          COLUMN+CELL
 1           column=article:content, timestamp=1434687037710, value=Hello,World
 1           column=article:tags, timestamp=1434533143824, value=Hadoop,HBase,NoSQL
 1           column=article:title, timestamp=1434533099302, value=Head First HBase
 1           column=author:name, timestamp=1434533489863, value=itsmikej1
 1           column=author:nickname, timestamp=1434687081713, value=\xE5\xB0\x8F\xE9\xBB\x91
1 row(s) in 0.0410 seconds

index.php

<?php
/**
 * Hbase操作
 *
 * @author itsmikej
 * @link https://mikej.me
 */

require_once __DIR__.'/lib/Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift', __DIR__ . '/lib');
# $loader->registerDefinition("tService", realpath(__DIR__ . "/gen-php"));
$loader->register();

use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\TBufferedTransport;

require_once __DIR__ . "/gen-php/Hbase/Hbase.php";
require_once __DIR__ . "/gen-php/Hbase/Types.php";


try {
    $socket = new TSocket('127.0.0.1', '9090');
    $socket->setSendTimeout(10000);
    $socket->setRecvTimeout(20000);
    $transport = new TBufferedTransport($socket);
    $protocol = new TBinaryProtocol($transport);
    $client = new Hbase\HbaseClient($protocol);
    $transport->open();

    $tables = $client->getTableNames();
    $table = current($tables);

    $mutations = array();
    $rowKey = '2';
    $row = array(
        'article' => array(
            'content' => 'Hello,Thrift',
            'tags' => 'Hbase',
            'title' => 'Thrift Test'
        ),
        'author' => array(
            'name' => 'tom',
            'nickname' => '小白',
            'sex' => 'man'  # 列可以动态扩展
        )
    );

    foreach ($row as $column_family => $data) {
        foreach ($data as $qualifier => $content) {
            $info = array(
                'column' => $column_family. ':' .$qualifier,
                'value' => $content
            );
            $mutations[] = new \Hbase\Mutation($info);
        }
    }

    # 添加数据
    $client->mutateRow($table, $rowKey, $mutations, array());


    $rows = array(
        '3' => $row,
        '4' => $row,
    );
    $row['author']['sex'] = 'woman';
    $rows['5'] = $row;

    # 批量添加
    $batchRecord = array();
    foreach ($rows as $row_key => $row) {
        $mutations = array();
        foreach ($row as $column_family => $data) {

            foreach ($data as $qualifier => $content) {
                $info = array(
                    'column' => $column_family . ':' . $qualifier,
                    'value' => $content
                );
                $mutations[] = new \Hbase\Mutation($info);
            }
        }
        $batchRecord[] = new \Hbase\BatchMutation(array(
            'row' => $row_key,
            'mutations' => $mutations
        ));

    }
    $client->mutateRows($table, $batchRecord, array());


    # 删除数据
    $client->deleteAllRow($table, $rowKey, array());


    # 查询数据
    $ret = $client->getRow($table, $rowKey, array());
    var_dump($ret, $ret[0]->columns['author:name']->value);


    # 扫描数据
    $scan = $client->scannerOpenWithStop($table, 1, 3, array(), array());
    $ret = $client->scannerGetList($scan, 2);
    var_dump($ret);


    # 过滤器
    $filter = array();
    $filter[] = "SingleColumnValueFilter('author', 'name', =, 'binary:tom')";
    $filter[] = "SingleColumnValueFilter('author', 'sex', =, 'binary:woman')";
    $filterString = implode(" AND ", $filter);
    $scanFilter = new \Hbase\TScan();
    $scanFilter->filterString = $filterString;
    $scanFilter->startRow = 1;
    $scanFilter->stopRow = 6;
    # $scanFilter->columns = array('column' => 'author'); # 指定返回列族

    $scan = $client->scannerOpenWithScan($table, $scanFilter, array());
    $ret = $client->scannerGetList($scan, 2);
    var_dump($ret);

    $transport->close();

} catch (Exception $e) {
    var_dump($e->getMessage());
}

参考

SingleColumnValueFilter过滤器在手册里的说明参数位置是错误的,正确的应该为:

Syntax: SingleColumnValueFilter(‘<family>’, ‘<qualifier>’, <compare operator>, ‘<comparator>’, ‘<filterIfColumnMissing_boolean>, <latest_version_boolean>’)
Syntax: SingleColumnValueFilter(‘<family>’, ‘<qualifier>, <compare operator>, ‘<comparator>’)
Example: “SingleColumnValueFilter (‘FamilyA’, ‘Column1’, <=, ‘abc’,‘true, false’)”
Example: “SingleColumnValueFilter (‘FamilyA’, ‘Column1’, <=, ‘abc’)”

使用 Thrift

Thrift是一个用于不同语言间通讯的开发框架。

首先用Thrift的语法定义好交互细节,命名为hello.thrift

namespace php hello

enum SexType {
  MALE = 1,
  FEMALE = 2
}

struct User {
  1: string firstname,
  2: string lastname,
  3: i32 user_id = 0,
  4: SexType sex,
  5: bool active = false,
  6: optional string description
}

exception InvalidValueException {
  1: i32 error_code,
  2: string error_msg
}

service UserExchange {
  void ping(),
  i32 add_user(1:User u) throws (1: InvalidValueException e),
  User get_user(1:i32 uid) throws (1: InvalidValueException e),
  oneway void clear_list()
}

生成php文件

thrift -r --gen php hello.thrift

生成python文件

thrift -r --gen py hello.thrift

python服务端,命名为python_server.py

#!/usr/bin/env python

import sys
sys.path.append('./gen-py')

from hello import UserExchange
from hello.ttypes import *

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer

users = []

class UserManagerHandler:
    def __init__(self):
        pass
        #self.log = {}

    def ping(self):
        print 'ping()'

    def add_user(self, user):
        if user.firstname == None:
            raise InvalidValueException(1,'no firstname exception')
        if user.lastname == None:
            raise InvalidValueException(2, 'no lastname exception')
        if user.user_id <= 0:
            raise InvalidValueException(3, 'wrong user_id')
        if user.sex != SexType.MALE and user.sex != SexType.FEMALE:
            raise InvalidValueException(4, 'wrong sex id')
        print 'Processing user '+user.firstname+' '+user.lastname
        users.append(user)
        print users
        return True

    def get_user(self, user_id):
        if user_id < 0:
            raise InvalidValueException(5, 'wrong id')
        return users[user_id]
        
    def clear_list(self):
        print 'Clearing list'
        print users
        del users [:]
        print users



handler = UserManagerHandler()
processor = UserExchange.Processor(handler)
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()

server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)

# You could do one of these for a multithreaded server
#server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
#server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory)

print 'Starting the server...'
server.serve()
print 'done.'

php客户端,命名为php_client.php

<?php

require_once __DIR__.'/lib/Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift', __DIR__ . '/lib');
$loader->registerDefinition("tService", realpath(__DIR__ . "/gen-php"));
$loader->register();

use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\THttpClient;
use Thrift\Transport\TBufferedTransport;
use Thrift\Exception\TException;

require_once __DIR__ . "/gen-php/hello/userExchange.php";
require_once __DIR__ . "/gen-php/hello/Types.php";

try {

    $socket = new TSocket('localhost', 9090);
    $transport = new TBufferedTransport($socket, 1024, 1024);
    $protocol = new TBinaryProtocol($transport);
    $client = new \hello\userExchangeClient($protocol);

    $transport->open();
    $client->ping();
    $u = new \hello\User();
    $u->user_id = 1;
    $u->firstname = 'John';
    $u->lastname = 'Smith';
    $u->sex = \hello\SexType::MALE;
    if ($client->add_user($u))
    {
        echo 'user added succesfully</br>'."\n";
    }
    
    var_dump($client->get_user(0));
    
    $client->clear_list();
    
    
    
    $u2 = new \hello\User();
    $client->add_user($u2);
    
} catch (\hello\InvalidValueException $e) {
    echo $e->error_msg."<br/>\n";
}

最后先运行python_server.py启动服务,然后执行php_client.php,一切正常的话会server端会看到如下数据

Starting the server...
ping()
Processing user John Smith
[User(user_id=1, description=None, firstname='John', lastname='Smith', sex=1, active=False)]
Clearing list
[User(user_id=1, description=None, firstname='John', lastname='Smith', sex=1, active=False)]
[]

客户端会看到

user added succesfully</br>
class hello\User#9 (6) {
  public $firstname =>
  string(4) "John"
  public $lastname =>
  string(5) "Smith"
  public $user_id =>
  int(1)
  public $sex =>
  int(1)
  public $active =>
  bool(false)
  public $description =>
  NULL
}
no firstname exception<br/>

这就表明数据已经正确的请求和响应了。

参考