无奈,网上关于C++访问Hbase的文章实在太少,所以只好自己折腾一下,然后写出来了。
要使用C++访问Hbase,可以走的途径少之又少,据说当前最好的方法就是通过Thrift来实现:http://thrift.apache.org/
所以本文分成几部分:(1)安装Thrift;(2)用Thrift 生成访问Hbase所需的C++文件;(3)在程序中通过Thrift来访问Hbase。
另外,本文只包含读写Hbase数据的例子,不包含配置Hbase的方法,如需这些内容,请自行搜索。
首先声明一下,本文基于以下环境:
操作系统:RHEL 5.3,64位
Thrift 版本:0.7.0
要访问的 Hbase 版本:0.20.6
我使用0.90.4的 Hbase 安装包来生成C++所需的Hbase.h等文件(用新版的应该能兼容旧版的)
下面开始,一步步来。
(1)安装Thrift
不是一件很轻松的事。如果你的系统比较干净,可能很顺利地就搞定了,如果有依赖库缺失问题或者库冲突问题,那么就只能根据具体情况,一个个问题去fix了,谁知道会有多少麻烦。
我运气比较好,在一个干净的系统上,很快就完成了。
Thrift 至少会依赖于两个系统中一般不会带的库:libevent,boost。
libevent 到这里下载:http://monkey.org/~provos/libevent/ 我使用的版本是:2.0.12-stable
boost 到这里下载:http://www.boost.org/ 我使用的版本是:1.47.0
安装libevent:
./configure --prefix=/usr/local/libevent make make install
安装boost(boost不像一般的Linux源码安装包一样,它的安装不是configure,make,make install,有点怪):
./bootstrap.sh --prefix=/usr/local/boost
不出错的话接着执行以下命令开始编译(也可以通过编辑project-config.jam文件调整编译参数):
./b2 ./b2 install
安装Thrift:
chmod +x configure ./configure --with-boost=/usr/local --prefix=/usr/local/thrift make make install
至此,安装Thrift 的工作就完成了。
(2)用Thrift 生成访问Hbase所需的C++文件
访问Hbase需要在你的程序中使用若干.h,.cpp文件,这些文件是用 Thrift 生成的。
解压Hbase源码安装包:
tar zxf hbase-0.90.4.tar.gz cd hbase-0.90.4
在解压出来的文件中, 你可以找到一个名为 Hbase.thrift 的文件,这个文件定义了如何通过 Thrift 接口来访问Hbase。用这个Thrift文件,可以生成访问Hbase所需的C++文件:
/usr/local/thrift/bin/thrift --gen cpp ./src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
会发现生成了gen-cpp目录:
ls gen-cpp/
输出:
Hbase_constants.cpp Hbase_constants.h Hbase.cpp Hbase.h Hbase_server.skeleton.cpp Hbase_types.cpp Hbase_types.h
除了Hbase_server.skeleton.cpp之外,其余文件都是在我们的程序里要用到的,将它们拷贝到我们的工程目录下。
(3)在程序中使用Thrift来访问Hbase
要能通过 Thrift 访问Hbase,你必须首先要打开HBase的 Thrift 服务,请参考其他文档确保这一点是可用的。
下一步,我们在程序中如何读取Hbase的数据?
我们先看看hbase源码安装包中自带的例子:在解压出来的安装包中的 examples/thrift/ 目录下的 DemoClient.cpp 文件,有如下代码:
boost::shared_ptr<TTransport> socket(new TSocket("localhost", 9090)); boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); HbaseClient client(protocol); try { transport->open(); // do something transport->close(); } catch (TException &tx) { printf("ERROR: %s\n", tx.what()); }
我们就仿照这个例子来做。从DemoClient.cpp可见,我们要先创建三个指针socket,transport和protocol,后两个分别依赖于前两个,最后再创建一个client对象,我们操作Hbase就是使用这个client对象。在操作Hbase前,需要先打开到Hbase Thrift service的连接,即 transport->open(),在操作完 Hbase之后,需要关闭连接,即 transport->close(),这下就比较清楚了:我们可以写一个自己的类CHbaseOperate,它应该有一个connect函数和一个disconnect函数,分别用于打开、关闭连接,还应该有读写Hbase的基本功能。读写Hbase的方法,请参考Hbase.h中的函数,例子还是看DemoClient.cpp。
文章来源:http://www.codelast.com/
下面上代码:
HbaseOperate.h:
#ifndef __HBASE_OPERATE_H #define __HBASE_OPERATE_H #include <string> #include <protocol/TBinaryProtocol.h> #include <transport/TSocket.h> #include <transport/TTransportUtils.h> #include "Hbase.h" /** * Class to operate Hbase. * * @author Darran Zhang (codelast.com) * @version 11-08-24 * @declaration These codes are only for non-commercial use, and are distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. * You must not remove this declaration at any time. */ using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using namespace apache::hadoop::hbase::thrift; typedef struct hbaseRet { std::string rowValue; time_t ts; hbaseRet() { ts = 0; } } hbaseRet; class CHbaseOperate { public: CHbaseOperate(); virtual ~CHbaseOperate(); private: boost::shared_ptr<TTransport> socket; boost::shared_ptr<TTransport> transport; boost::shared_ptr<TProtocol> protocol; HbaseClient *client; std::string hbaseServiceHost; int hbaseServicePort; bool isConnected; public: bool connect(); bool connect(std::string host, int port); bool disconnect(); bool putRow(const std::string &tableName, const std::string &rowKey, const std::string &column, const std::string &rowValue); bool getRow(hbaseRet &result, const std::string &tableName, const std::string &rowKey, const std::string &columnName); }; #endif
HbaseOperate.cpp:
#include "HbaseOperate.h" #include "log4cxx/log4cxx.h" #include "log4cxx/propertyconfigurator.h" static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("HbaseOperate.cpp")); /** * Class to operate Hbase. * * @author Darran Zhang (codelast.com) * @version 11-08-24 * @declaration These codes are only for non-commercial use, and are distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. * You must not remove this declaration at any time. */ using namespace std; CHbaseOperate::CHbaseOperate() : socket((TSocket*)NULL), transport((TBufferedTransport*)NULL), protocol((TBinaryProtocol*)NULL), client(NULL), hbaseServicePort(9090), isConnected(false) { } CHbaseOperate::~CHbaseOperate() { if (isConnected) { disconnect(); } if (NULL != client) { delete client; client = NULL; } } /** * Connect Hbase. * */ bool CHbaseOperate::connect() { if (isConnected) { LOG4CXX_INFO(logger, "Already connected, don't need to connect it again"); return true; } try { socket.reset(new TSocket(hbaseServiceHost, hbaseServicePort)); transport.reset(new TBufferedTransport(socket)); protocol.reset(new TBinaryProtocol(transport)); client = new HbaseClient(protocol); transport->open(); } catch (const TException &tx) { LOG4CXX_ERROR(logger, "Connect Hbase error : " << tx.what()); return false; } isConnected = true; return isConnected; } /** * Connect Hbase. * */ bool CHbaseOperate::connect(std::string host, int port) { hbaseServiceHost = host; hbaseServicePort = port; return connect(); } /** * Disconnect from Hbase. * */ bool CHbaseOperate::disconnect() { if (!isConnected) { LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't disconnect from it"); return false; } if (NULL != transport) { try { transport->close(); } catch (const TException &tx) { LOG4CXX_ERROR(logger, "Disconnect Hbase error : " << tx.what()); return false; } } else { return false; } isConnected = false; return true; } /** * Put a row to Hbase. * * @param tableName [IN] The table name. * @param rowKey [IN] The row key. * @param column [IN] The "column family : qualifier". * @param rowValue [IN] The row value. * @return True for successfully put the row, false otherwise. */ bool CHbaseOperate::putRow(const string &tableName, const string &rowKey, const string &column, const string &rowValue) { if (!isConnected) { LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't put row"); return false; } try { std::vector<Mutation> mutations; mutations.push_back(Mutation()); mutations.back().column = column; mutations.back().value = rowValue; client->mutateRow(tableName, rowKey, mutations); } catch (const TException &tx) { LOG4CXX_ERROR(logger, "Operate Hbase error : " << tx.what()); return false; } return true; } /** * Get a Hbase row. * * @param result [OUT] The object which contains the returned data. * @param tableName [IN] The Hbase table name, e.g. "MyTable". * @param rowKey [IN] The Hbase row key, e.g. "kdr23790". * @param columnName [IN] The "column family : qualifier". * @return True for successfully get the row, false otherwise. */ bool CHbaseOperate::getRow(hbaseRet &result, const std::string &tableName, const std::string &rowKey, const std::string &columnName) { if (!isConnected) { LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't read data from it"); return false; } std::vector<std::string> columnNames; columnNames.push_back(columnName); std::vector<TRowResult> rowResult; try { client->getRowWithColumns(rowResult, tableName, rowKey, columnNames); } catch (const TException &tx) { LOG4CXX_ERROR(logger, "Operate Hbase error : " << tx.what()); return false; } if (0 == rowResult.size()) { LOG4CXX_WARN(logger, "Got no record with the key : [" << rowKey << "]"); return false; } std::map<std::string, TCell>::const_iterator it = rowResult[rowResult.size() -1].columns.begin(); result.rowValue = it->second.value; result.ts = it->second.timestamp; return true; }
注意我在程序中使用了Apache log4cxx这个记录日志的库来打印/保存程序运行日志,使用方法可参考【此链接】。如果你不想用,可以自己改为std::cout。
代码有了,使用方法为:可以在你的程序中创建一个全局对象:
CHbaseOperate g_ho;
在需要操作Hbase之前:
g_ho.connect("192.168.55.66", 9090);
其中,“192.168.55.66”和9090分别是你的Hbase Thrift service的服务器地址和端口号,你需要正确地配置好,才能使用。本文开头已经说了,本文不讨论这方面的问题。
在操作完Hbase之后:
g_ho.disconnect();
文章来源:http://www.codelast.com/
现在再来说一下读写操作Hbase的两个函数:putRow()和getRow()。
putRow():
bool putRow(const std::string &tableName, const std::string &rowKey, const std::string &column, const std::string &rowValue);
这是向Hbase写入一条记录的函数,参数tableName为Hbase表名,即你要将记录写到哪个Hbase表中;参数rowKey为待写入的记录的key;参数column为待写入的记录的“column family:qualifier”组合,参数rowValue为待写入的记录的value。
getRow():
bool getRow(hbaseRet &result, const std::string &tableName, const std::string &rowKey, const std::string &columnName);
这是从Hbase中读取一条记录的函数,参数tableName为Hbase表名,即你要从哪个Hbase表中读取记录;参数rowKey为你要查询的记录的key;参数columnName为你要查询的记录的“column family:qualifier”组合;参数result为返回的Hbase的数据,它包含记录的value和记录的时间戳:
typedef struct hbaseRet { std::string rowValue; time_t ts; hbaseRet() { ts = 0; } } hbaseRet;
文章来源:http://www.codelast.com/
至于操作的结果对不对,可以在Hbase shell中用get, scan等命令来验证,具体方法请看Hbase shell的help。另外,最好再写一些unit test来测试。
如果你要为CHbaseOperate类添加功能,可以参考Hbase.h文件中的函数定义。如你所见,CHbaseOperate类主要也是调用了里面的函数,只不过这个类可以让一些不太熟悉Hbase概念的人可以更方便地操作Hbase罢了。
文章来源:https://www.codelast.com/
➤➤ 版权声明 ➤➤
转载需注明出处:codelast.com
感谢关注我的微信公众号(微信扫一扫):
client在disconnect的时候没有释放,再次连接的时候将造成内存泄漏!
很不错,期待继续更新,多谢了
请教一下你的main函数是怎样的?(QQ:970846570)
实验室项目要用到这个
lz的东西不错 直接能用
谢一个
楼上的可以加一个好友吗?(QQ:970846570)
楼上 我是一个研一的学生,想加你好友,可以吗?(QQ:970846570)