+ =HadoopDB
圖片來源:http://hadoop.apache.org/ 與 http://wiki.postgresql.org/
雖然,上頭圖片的結合並不代表 HadoopDB 的原意,但 HadoopDB 開發上使用了 PostgreSQL 作為範例。那什麼是 HadoopDB 呢?那就是提出並實作在 DataNode 上架設 Database 的模式,以此整合起來的一個服務架構。傳統 Databases 技術上的鑽研已不是短短幾年可以道盡,然而,目前火紅的 Hadoop ,在資料儲存上預設是採用 HDFS 模式,就是簡單的檔案儲存模式。如果把資料的管理擺在資料庫,那能不能將過去在資料庫上的效能帶入 Hadoop 裡呢?這就是 HadoopDB 想嘗試的方向。HadoopDB:An architectural hybrid of MapReduce and DBMS technologies,也有人用這樣來描述 HadoopDB:An Open Source Parallel Database。
圖片來源:http://hadoopdb.sourceforge.net/guide/
上圖是 HadoopDB 的架構圖,有一個 SMS Planner 將 SQL 語法轉成 MapReduce Jobs ,而底層又有一次轉換,將 MapReduce 存取轉成 SQL 語法,在此不多探討,有興趣的請直接看它的 Paper 囉。
由於它是在 Hadoop-0.19.x 開發的,因此我還是用 Hadoop-0.19.2 來架設,至於架設部分可以參考這篇:[Linux] 安裝 Hadoop 0.20.1 Multi-Node Cluster @ Ubuntu 9.10,其中 0.19.2 與 0.20.1 安裝上只有些微的差別,在上述文章中的 hadoop-0.20.1/conf/core-site.xml 與 hadoop-0.20.1/conf/mapred-site.xml
的內容,只需改寫在 hadoop-0.19.2/conf/hadoop-site.xml 即可。接著下面的介紹也將延續上則安裝教學,以 3-Node Cluster ,分別以 Cluster01、Cluster02 和 Cluster03 作為範例敘述,並且各台使用 hadoop 帳號來操作。
- 首先需建立 3-Node Cluster on Hadoop 0.19.x
- HadoopDB Quick Start Guide
- JDBC Driver - Java 1.6 請使用 postgresql-8.4-701.jdbc4.jar
- 另一個不錯的安裝文件:HadoopDB 實做
- 以下若是用 hadoop@Cluster0X:~ 代表 Cluster01 ~ Cluster03 都要做的
- 對各台安裝設定 PostgreSQL
- 安裝並為資料庫建立 hadoop 帳號,假定使用密碼為 1234
- hadoop@Cluster0X:~$ sudo apt-get install postgresql
- hadoop@Cluster0X:~$ sudo vim /etc/postgresql/8.4/main/pg_hba.conf
- #local all all ident
local all all trust
# IPv4 local connections:
#host all all 127.0.0.1/32 md5
host all all 127.0.0.1/32 password
host all all 192.168.0.1/16 password # 加上Cluster 機器 IP 範圍
# IPv6 local connections:
#host all all ::1/128 md5
host all all ::1/128 password
- #local all all ident
- hadoop@Cluster0X:~$ sudo /etc/init.d/postgresql-8.4 restart
- hadoop@Cluster0X:~$ sudo su - postgres
- postgres@Cluster0X:~$ createuser hadoop
- Shall the new role be a superuser? (y/n) y
postgres@Cluster01:~$ psql
psql (8.4.2)
Type "help" for help.
postgres=# alter user hadoop with password '1234';
ALTER ROLE
postgres=# \q
- Shall the new role be a superuser? (y/n) y
- 測試其他機器可否連線
- hadoop@Cluster01:~$ createdb testdb
- hadoop@Cluster02:~$ psql -h Cluster01 testdb
- 錯誤訊息
- psql: FATAL: no pg_hba.conf entry for host "192.168.56.168", user "hadoop", database "testdb", SSL on
FATAL: no pg_hba.conf entry for host "192.168.56.168", user "hadoop", database "testdb", SSL off
- psql: FATAL: no pg_hba.conf entry for host "192.168.56.168", user "hadoop", database "testdb", SSL on
- 正確訊息
- Password:
psql (8.4.2)
SSL connection (cipher: DHE-RSA-AES256-SHA, bits: 256)
Type "help" for help.
testdb=#
- Password:
- 錯誤訊息
- 設定 HadoopDB
- hadoop@Cluster0X:~$ cp hadoopdb.jar HADOOP_HOME/lib/
- hadoop@Cluster0X:~$ cp postgresql-8.4-701.jdbc4.jar HADOOP_HOME/lib/
- hadoop@Cluster0X:~$ vim HADOOP_HOME/conf/hadoop-site.xml
<property>
<name>hadoopdb.config.file</name>
<value>HadoopDB.xml</value>
<description>The name of the HadoopDB cluster configuration file</description>
</property>
<property>
<name>hadoopdb.fetch.size</name>
<value>1000</value>
<description>The number of records fetched from JDBC ResultSet at once</description>
</property>
<property>
<name>hadoopdb.config.replication</name>
<value>false</value>
<description>Tells HadoopDB Catalog whether replication is enabled.
Replica locations need to be specified in the catalog.
False causes replica information to be ignored.</description>
</property>
- hadoop@Cluster01:~$ vim nodes.txt
- 192.168.56.168
192.168.56.169
192.168.56.170
- 192.168.56.168
- hadoop@Cluster01:~$ vim Catalog.properties
- #Properties for Catalog Generation
##################################
nodes_file=nodes.txt
# Relations Name and Table Name are the same
relations_unchunked=raw
relations_chunked=poi
catalog_file=HadoopDB.xml
##
#DB Connection Parameters
##
port=5432
username=hadoop
password=1234
driver=org.postgresql.Driver
url_prefix=jdbc\:postgresql\://
##
#Chunking properties
##
# the number of databases on a node
chunks_per_node=3
# for udb0 ,udb1, udb2 ( 3 nodes = 0 ~ 2 )
unchunked_db_prefix=udb
# for cdb0 ,cdb1, ... , cdb8 ( 3 nodes x 3 chunks = 0~8 )
chunked_db_prefix=cdb
##
#Replication Properties
##
dump_script_prefix=/root/dump_
replication_script_prefix=/root/load_replica_
dump_file_u_prefix=/mnt/dump_udb
dump_file_c_prefix=/mnt/dump_cdb
##
#Cluster Connection
##
ssh_key=id_rsa-gsg-keypair
- #Properties for Catalog Generation
- hadoop@Cluster01:~$ java -cp lib/hadoopdb.jar edu.yale.cs.hadoopdb.catalog.SimpleCatalogGenerator Catalog.properties
- 產生的 HadoopDB.xml 類似下面:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<DBClusterConfiguration xmlns="http://edu.yale.cs.db.hadoop/DBConfigurationSchema">
<Nodes Password="1234" Username="hadoop" Driver="org.postgresql.Driver" Location="192.168.56.168">
<Relations id="raw">
<Partitions url="jdbc:postgresql://192.168.56.168:5432/udb0" id="0"/>
</Relations>
<Relations id="poi">
<Partitions url="jdbc:postgresql://192.168.56.168:5432/cdb0" id="0"/>
<Partitions url="jdbc:postgresql://192.168.56.168:5432/cdb1" id="1"/>
<Partitions url="jdbc:postgresql://192.168.56.168:5432/cdb2" id="2"/>
</Relations>
</Nodes>
<Nodes Password="1234" Username="hadoop" Driver="org.postgresql.Driver" Location="192.168.56.169">
<Relations id="raw">
<Partitions url="jdbc:postgresql://192.168.56.169:5432/udb1" id="1"/>
</Relations>
<Relations id="poi">
<Partitions url="jdbc:postgresql://192.168.56.169:5432/cdb3" id="3"/>
<Partitions url="jdbc:postgresql://192.168.56.169:5432/cdb4" id="4"/>
<Partitions url="jdbc:postgresql://192.168.56.169:5432/cdb5" id="5"/>
</Relations>
</Nodes>
<Nodes Password="1234" Username="hadoop" Driver="org.postgresql.Driver" Location="192.168.56.170">
<Relations id="raw">
<Partitions url="jdbc:postgresql://192.168.56.170:5432/udb2" id="2"/>
</Relations>
<Relations id="poi">
<Partitions url="jdbc:postgresql://192.168.56.170:5432/cdb6" id="6"/>
<Partitions url="jdbc:postgresql://192.168.56.170:5432/cdb7" id="7"/>
<Partitions url="jdbc:postgresql://192.168.56.170:5432/cdb8" id="8"/>
</Relations>
</Nodes>
</DBClusterConfiguration>
- 產生的 HadoopDB.xml 類似下面:
- hadoop@Cluster01:~$ hadoop dfs -put HadoopDB.xml HadoopDB.xml
- 建立資料表、測試資料匯入各台機器的資料庫中,並且在 Hive 上建立相對應的資料表
- 在此以 raw 這個 talbe 當作範例。假設 HadoopDB.xml 對 raw 這個 table 敘述有 3 個,即上述範例的 udb0 、udb1 和 udb2,那就要分別去上頭指定的機器上建立資料庫
- hadoop@Cluster01:~$ createdb udb0
hadoop@Cluster02:~$ createdb udb1
hadoop@Cluster03:~$ createdb udb2
- hadoop@Cluster01:~$ createdb udb0
- 並且依輸入的資料建立資料表
- hadoop@Cluster01:~$ psql udb0
udb0=#
CREATE TABLE raw (
ID int,
NAME varchar(300)
); - 同理如 Cluster02 跟 Cluster03
- hadoop@Cluster01:~$ psql udb0
- 資料匯入
- hadoop@Cluster01:~$ psql udb0
udb0=# COPY RAW FROM '/home/hadoop/p0' WITH DELIMITER E'\t' ; - 關於 /home/hadoop/p0 的資料主要從原本依開始的大檔案,使用 HadoopDB 所提供的切割工具處理的
- $ hadoop jar lib/hadoopdb.jar edu.yale.cs.hadoopdb.dataloader.GlobalHasher src_in_hdfs out_in_hdfs 3 '\n' 0
- $ hadoop fs -get out_in_hdfs/part-00000 /home/hadoop/p0
- 假設資料擺在 /home/haddop/p0 並且欄位以 tab 分隔
- 同理也要處理 Cluster02 跟 Cluster03
- hadoop@Cluster01:~$ psql udb0
- 最後,在 Hive 上頭建立相對應的資料表 (只需用一台機器執行)
- 假設 Hive 使用的資料表將儲存在 HDFS 的 /db
- hadoop@Cluster01:~ $ hadoop dfs -mkdir /db
- hadoop@Cluster01:~ $ SMS_dist/bin/hive
CREATE EXTERNAL TABLE raw (
ID int,
NAME string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS
INPUTFORMAT 'edu.yale.cs.hadoopdb.sms.connector.SMSInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/db/raw'; - 其中 /db/raw 的 basename 要跟 table 名稱一樣(各台資料庫裡的資料表與Hive建立的資料表),另外對於資料欄位的型別也別忘了轉換囉
- 在此以 raw 這個 talbe 當作範例。假設 HadoopDB.xml 對 raw 這個 table 敘述有 3 個,即上述範例的 udb0 、udb1 和 udb2,那就要分別去上頭指定的機器上建立資料庫
- 以上設定完後,即可在一台機器上(Ex: Cluster1) 執行 $ SMS_dist/bin/hive 看看成果
- hadoop@Cluster01:~ $ SMS_dist/bin/hive
hive> show tables;
hive> select name from raw;
- hadoop@Cluster01:~ $ SMS_dist/bin/hive
上述是採用 unchunked 當作範例,如果改用 chunked 模式,假設有三台機器,並且打算使用三個資料庫,那設定上就會變成 3 x 3 的數量,也就是共有 9 個資料庫要設定,包括建立資料庫、建立資料表,匯入資料等等的,因此實作上應該寫隻 script 處理,在 HadoopDB 網站上有一個建立 /my_data 目錄範例,我把它稍微改寫:
#!/usr/bin/python
import sys, os, thread, commands
import getopt
DEBUG_MODE = True
completed = {}
create_db_cmd_list = [ ''
, 'createdb testdb'
, 'echo "CREATE TABLE Helo ( ID int );" | psql testdb'
, 'dropdb testdb'
]
cmd_list = []
cmd_list.extend( create_db_cmd_list )
def ParseHadoopXML( file_path ) :
return
def executeThread(node, *args ):
#Make sure key is accessible and is the correct key name.
#os.system("ssh -i key -o 'StrictHostKeyChecking=no' %s \'mkdir /my_data \'" %(node))
#You could replace the mkdir command with another command line or add more command lines,
# as long as you prefix the command with the ssh connection.
if DEBUG_MODE :
print "\tShow Only"
for cmd in cmd_list :
if cmd == None or cmd == '' :
continue;
cmd = cmd.strip()
if cmd == '' :
continue;
cmd_exec = "ssh %s \'%s\'" % (node , cmd )
print "\t" , cmd_exec
if DEBUG_MODE == False :
os.system( cmd_exec )
completed[node] = "true"
def main( argv=None ):
hostfile = "nodes.txt"
internalips = open(hostfile,'r').readlines()
for i in internalips:
os.system('sleep 1')
node_info = i.strip() ,
thread.start_new_thread(executeThread, node_info )
while (len(completed.keys()) < len(internalips)):
os.system('sleep 2')
print "Execution Completed"
if __name__ == "__main__":
main()
最後,我則是來個大改寫,若要使用十分建議先在用在虛擬環境,看看流程對不對,當然,最好是自己先手動設定過,等流程清楚後再設計符合自己的需求。使用上要先準備的資料:
- 將原始擺在 HDFS ,預設資料以 new line 分格,每筆資料以 Tab 分格欄位,例如:
- 1 changyy
2 hello world
3 hadoop
- 1 changyy
- 在本機端建立 nodes.txt ,裡頭敘述 Cluster 各台機器的 IP,用 New Line('\n') 符號分格,例如:
- 192.168.56.168
192.168.56.169
192.168.56.170
- 192.168.56.168
- 建立資料表的欄位敘述,預設此用 table_create 檔案,例如:
- ID int ,
NAME varchar(250)
- ID int ,
- 最後,可以透過 $python this.py -help 查看有什麼可以設定,只不過是用很破的英文描述
純粹 show 出將會執行的指令,請留意它將會刪除哪些目錄、資料庫等等
$ python this.py --source_dir_in_hdfs src
真正運行
$ python this.py --source_dir_in_hdfs src -go
預設是 unchunked ,若想設定可以用 --chunk_num 設定
$ python this.py --source_dir_in_hdfs src --chunk_num 3
實際運行例子:
- hadoop@Cluster01:~$ cat nodes.txt
192.168.56.168
192.168.56.169
192.168.56.170- hadoop@Cluster01:~$ cat table_create
ID int,
NAME varchar(250)- hadoop@Cluster01:~$ python batch_setup.py --source_dir_in_hdfs src
Current Status is just Debug Mode for show all commands
please set '-g' or '--go' option to execute them after check all commands.(look at the 'rm -rf' and 'hadoop fs -rmr')
$ /usr/bin/java -cp /home/hadoop/lib/hadoopdb.jar edu.yale.cs.hadoopdb.catalog.SimpleCatalogGenerator /tmp/Catalog.properties
=> Start to put the HadoopDB.xml into HDFS
$ /home/hadoop/bin/hadoop fs -rmr HadoopDB.xml
$ /home/hadoop/bin/hadoop fs -put HadoopDB.xml HadoopDB.xml
=> The data source(src) would be partitioned into 3 parts(tmp_out_hadoopdb) by the delimiter (\n)
$ /home/hadoop/bin/hadoop fs -rmr tmp_out_hadoopdb
$ /home/hadoop/bin/hadoop jar /home/hadoop/lib/hadoopdb.jar edu.yale.cs.hadoopdb.dataloader.GlobalHasher src tmp_out_hadoopdb 3 '\n' 0
=> To configure your nodes...
ssh 192.168.56.168 "dropdb udb_hadoopdb_0"
ssh 192.168.56.168 "createdb udb_hadoopdb_0"
ssh 192.168.56.168 "echo \"create table hadoopdb ( id int, name varchar(250) );\" | psql udb_hadoopdb_0"
ssh 192.168.56.168 "rm -rf /tmp/out_for_global_parition"
ssh 192.168.56.168 "/home/hadoop/bin/hadoop fs -get tmp_out_hadoopdb/part-00000 /tmp/out_for_global_parition"
ssh 192.168.56.168 "echo \"COPY hadoopdb FROM '/tmp/out_for_global_parition' WITH DELIMITER E'\t';\" | psql udb_hadoopdb_0"
ssh 192.168.56.168 "rm -rf /tmp/out_for_global_parition"
ssh 192.168.56.170 "dropdb udb_hadoopdb_2"
ssh 192.168.56.170 "createdb udb_hadoopdb_2"
ssh 192.168.56.170 "echo \"create table hadoopdb ( id int, name varchar(250) );\" | psql udb_hadoopdb_2"
ssh 192.168.56.170 "rm -rf /tmp/out_for_global_parition"
ssh 192.168.56.170 "/home/hadoop/bin/hadoop fs -get tmp_out_hadoopdb/part-00002 /tmp/out_for_global_parition"
ssh 192.168.56.170 "echo \"COPY hadoopdb FROM '/tmp/out_for_global_parition' WITH DELIMITER E'\t';\" | psql udb_hadoopdb_2"
ssh 192.168.56.170 "rm -rf /tmp/out_for_global_parition"
ssh 192.168.56.169 "dropdb udb_hadoopdb_1"
ssh 192.168.56.169 "createdb udb_hadoopdb_1"
ssh 192.168.56.169 "echo \"create table hadoopdb ( id int, name varchar(250) );\" | psql udb_hadoopdb_1"
ssh 192.168.56.169 "rm -rf /tmp/out_for_global_parition"
ssh 192.168.56.169 "/home/hadoop/bin/hadoop fs -get tmp_out_hadoopdb/part-00001 /tmp/out_for_global_parition"
ssh 192.168.56.169 "echo \"COPY hadoopdb FROM '/tmp/out_for_global_parition' WITH DELIMITER E'\t';\" | psql udb_hadoopdb_1"
ssh 192.168.56.169 "rm -rf /tmp/out_for_global_parition"
$ /home/hadoop/bin/hadoop fs -rmr tmp_out_hadoopdb
=> To setup the external table for Hive
$ /home/hadoop/bin/hadoop fs -mkdir /db
$ /home/hadoop/bin/hadoop fs -rmr /db/hadoopdb
$ echo "drop table hadoopdb;" | /home/hadoop/SMS_dist/bin/hive
$ echo "create external table hadoopdb ( id int, name string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS INPUTFORMAT 'edu.yale.cs.hadoopdb.sms.connector.SMSInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '/db/hadoopdb'; " | /home/hadoop/SMS_dist/bin/hive
=> All Execution Completed...
#!/usr/bin/python
# At Python 2.6.4
# Yuan-Yi Chang
# 2010/01/07 15:09
#
import sys, os, thread, commands
import re, os.path
from optparse import OptionParser
BIN_JAVA = '/usr/bin/java'
BIN_HADOOP = '/home/hadoop/bin/hadoop'
BIN_HIVE = '/home/hadoop/SMS_dist/bin/hive'
JAR_HADOOPDB = '/home/hadoop/lib/hadoopdb.jar'
completed = {}
cmd_for_node = {}
def initHadoopDB( data_in_hdfs = None , data_delimiter = '\n' , data_field_delimiter = '\t' ,
data_partition_out = None ,
nodes_in_file = 'nodes.txt' , chunks_per_node = 3 ,
table_name = None , table_field_info = None ,
db_user = 'hadoop' , db_pass='1234' , db_field_delimiter = '|' , hive_db_dir_in_hdfs = '/db' ,
tmp_path_for_catelog = '/tmp/Catalog.properties' ,
out_hadoop_xml = 'HadoopDB.xml' , hadoop_xml_in_hdfs = 'HadoopDB.xml' ,
DEBUG_MODE = True ) :
if data_in_hdfs is None :
print 'Please input the path of the data source in HDFS'
return False
if data_partition_out is None :
print 'Please input the path for the data source parition in HDFS'
return False
if table_name is None or re.match( '/[a-z0-9_]+/' , table_name ) :
print 'Please input the table name with [a-z0-9_] only'
return False
if table_field_info is None or os.path.isfile( table_field_info ) is False :
print 'Please check the "table_field_info" : ' + str(table_field_info)
return False
if os.path.isfile( nodes_in_file ) is False :
print 'Please check the "nodes_in_file" : ' + nodes_in_file
return False
if chunks_per_node < 0 :
print 'Please check the "chunks_per_node" : ' + chunks_per_node + ' , 0 for no chunk'
return False
data_delimiter = data_delimiter.replace( '\n' , '\\n' ).replace( '\t' , '\\t' )
data_field_delimiter = data_field_delimiter.replace( '\t' , '\\t' ).replace( '\n' , '\\n' )
db_field_delimiter = db_field_delimiter.replace( '\t' , '\\t' ).replace( '\n' , '\\n' )
make_catelog = ''
#Properties for Catalog Generation'
##################################
make_catelog += 'nodes_file='+nodes_in_file+'\n'
if chunks_per_node < 2 :
make_catelog += 'relations_chunked=no_use' + '\n'
make_catelog += 'relations_unchunked='+table_name + '\n'
else:
make_catelog += 'relations_unchunked=' + 'no_use' + '\n'
make_catelog += 'relations_chunked='+table_name + '\n'
make_catelog += 'catalog_file=' + out_hadoop_xml + '\n'
##
#DB Connection Parameters
##
make_catelog += 'port=5432' + '\n'
make_catelog += 'username=' + db_user + '\n'
make_catelog += 'password=' + db_pass + '\n'
make_catelog += 'driver=org.postgresql.Driver' + '\n'
make_catelog += 'url_prefix=jdbc\\:postgresql\\://'+ '\n'
##
#Chunking properties
##
make_catelog += 'chunks_per_node=' + str(chunks_per_node) + '\n'
make_catelog += 'unchunked_db_prefix=udb_' + table_name + '_' + '\n'
make_catelog += 'chunked_db_prefix=cdb_'+ table_name + '_' + '\n'
##
#Replication Properties
##
make_catelog += 'dump_script_prefix=/root/dump' + '\n'
make_catelog += 'replication_script_prefix=/root/load_replica_' + '\n'
make_catelog += 'dump_file_u_prefix=/mnt/dump_udb' + '\n'
make_catelog += 'dump_file_c_prefix=/mnt/dump_cdb'+ '\n'
##
#Cluster Connection
##
make_catelog += 'ssh_key=id_rsa-gsg-keypair' + '\n'
try:
f = open( tmp_path_for_catelog , 'w' )
f.write( make_catelog )
f.close()
except:
print 'Error to write a catelog:'+tmp_path_for_catelog
return False
cmd_exec = BIN_JAVA + ' -cp ' + JAR_HADOOPDB + '
edu.yale.cs.hadoopdb.catalog.SimpleCatalogGenerator ' +
tmp_path_for_catelog
if DEBUG_MODE :
print '$ ' + cmd_exec
else:
os.system( cmd_exec )
if os.path.isfile( out_hadoop_xml ) is False :
print 'Please check the "out_hadoop_xml" : ' + out_hadoop_xml
return False
print '\n=> Start to put the HadoopDB.xml into HDFS\n'
if DEBUG_MODE :
print '$ ' + BIN_HADOOP + ' fs -rmr ' + hadoop_xml_in_hdfs
print '$ ' + BIN_HADOOP + ' fs -put ' + out_hadoop_xml + ' ' + hadoop_xml_in_hdfs
else:
os.system( BIN_HADOOP + ' fs -rmr ' + hadoop_xml_in_hdfs )
os.system( BIN_HADOOP + ' fs -put ' + out_hadoop_xml + ' ' + hadoop_xml_in_hdfs )
partition_num = 0
node_list = []
try:
tmp_list = open( nodes_in_file ,'r').readlines()
for line in tmp_list :
line = line.strip()
if line <> '' :
node_list.append( line )
partition_num = len( node_list )
except:
print 'Please check the "nodes_in_file" : ' + nodes_in_file
return False
if partition_num > 1 :
cmd_exec = BIN_HADOOP + ' jar ' + JAR_HADOOPDB + '
edu.yale.cs.hadoopdb.dataloader.GlobalHasher ' + data_in_hdfs + ' ' +
data_partition_out + ' ' + str(partition_num) + ' \'' + data_delimiter
+ '\' 0 '
print '\n=> The data
source('+data_in_hdfs+') would be partitioned into
'+str(partition_num)+' parts('+data_partition_out+') by the delimiter
('+data_delimiter+')\n'
if DEBUG_MODE :
print '$ ' + BIN_HADOOP + ' fs -rmr ' + data_partition_out
print '$ ' + cmd_exec
else:
os.system( BIN_HADOOP + ' fs -rmr ' + data_partition_out )
os.system( cmd_exec )
else:
print '\n=> The number of datanodes should be > 1\n'
return False
HadoopDB_Info = ''
try:
HadoopDB_Info = open( out_hadoop_xml , 'r' ).read()
except:
print 'Error at read "out_hadoop_xml" : ' + out_hadoop_xml
return False
if HadoopDB_Info is '' :
print 'The info in the file is empty : ' + HadoopDB_Info
return False
DB_TABLE_CREATE_INFO = ''
try:
DB_TABLE_CREATE_INFO = open( table_field_info , 'r' ).read().strip()
except:
print 'Error at read "table_field_info" : ' + table_field_info
return False
if DB_TABLE_CREATE_INFO is '' :
print 'The info in the file is empty : ' + DB_TABLE_CREATE_INFO
return False
DB_TABLE_CREATE_INFO = DB_TABLE_CREATE_INFO.replace( "\n" , ' ' ).replace( '"' , '\\"' ).lower()
DB_TABLE_CREATE_INFO = 'create table ' + table_name + ' ( ' + DB_TABLE_CREATE_INFO + ' );'
#print node_list
partition_index = 0
for node in node_list:
cmd_for_node[ node ] = []
if chunks_per_node is 0 : # use unchunked mode
db_list = re.findall( '' + node +':[\d]+/(udb_' + table_name + '_'+'[\w]+)' , HadoopDB_Info )
for sub_db in db_list :
# Create Database & Table
cmd_for_node[ node ].append( 'dropdb ' + sub_db )
cmd_for_node[ node ].append( 'createdb ' + sub_db )
cmd_for_node[ node ].append( 'echo "'+DB_TABLE_CREATE_INFO+'" | psql '+ sub_db )
cmd_for_node[ node ].append( 'rm -rf /tmp/out_for_global_parition' )
cmd_for_node[ node ].append( BIN_HADOOP + ' fs -get ' +
data_partition_out + '/part-%0.5d /tmp/out_for_global_parition' %
partition_index )
cmd_for_node[ node ].append( 'echo
"COPY '+table_name+' FROM \'/tmp/out_for_global_parition\' WITH
DELIMITER E\''+data_field_delimiter+'\';" | psql '+ sub_db )
cmd_for_node[ node ].append( 'rm -rf /tmp/out_for_global_parition' )
else:
db_list = re.findall( '' + node +':[\d]+/(cdb_' + table_name + '_'+'[\w]+)' , HadoopDB_Info )
if db_list <> None :
cmd_for_node[ node ].append( 'rm -rf /tmp/*out_for_global_parition' )
cmd_for_node[ node ].append( BIN_HADOOP + ' fs -get ' +
data_partition_out + '/part-%0.5d /tmp/out_for_global_parition' %
partition_index )
cmd_for_node[ node ].append( 'cd
/tmp; ' + BIN_JAVA + ' -cp ' + JAR_HADOOPDB + '
edu.yale.cs.hadoopdb.dataloader.LocalHasher out_for_global_parition ' +
str( chunks_per_node ) + ' \'' + data_delimiter + '\' 0 ' )
sub_part = 0
for sub_db in db_list :
# Create Database & Table
cmd_for_node[ node ].append( 'dropdb ' + sub_db )
cmd_for_node[ node ].append( 'createdb ' + sub_db )
cmd_for_node[ node ].append( 'echo "'+DB_TABLE_CREATE_INFO+'" | psql '+ sub_db )
cmd_for_node[ node ].append( 'echo "COPY
'+table_name+' FROM \'/tmp/'+str(sub_part)+'-out_for_global_parition\'
WITH DELIMITER E\''+data_field_delimiter+'\';" | psql '+ sub_db )
sub_part = sub_part + 1
#cmd_for_node[ node ].append( 'rm -rf /tmp/'+str(sub_part)+'-out_for_global_parition' )
cmd_for_node[ node ].append( 'rm -rf /tmp/*out_for_global_parition' )
partition_index = partition_index + 1
print '\n=> To configure your nodes...\n'
for node in node_list:
thread.start_new_thread( executeThreadForNode , ( node, DEBUG_MODE ) )
while (len(completed.keys()) < len(node_list) ) :
os.system('sleep 2')
if DEBUG_MODE :
print '$ ' + BIN_HADOOP + ' fs -rmr ' + data_partition_out
else:
os.system( BIN_HADOOP + ' fs -rmr ' + data_partition_out )
print '\n=> To setup the external table for Hive\n'
if DEBUG_MODE :
print '$ ' + BIN_HADOOP + ' fs -mkdir ' + hive_db_dir_in_hdfs
print '$ ' + BIN_HADOOP + ' fs -rmr ' + hive_db_dir_in_hdfs + '/' + table_name
else:
os.system( BIN_HADOOP + ' fs -mkdir ' + hive_db_dir_in_hdfs )
os.system( BIN_HADOOP + ' fs -rmr ' + hive_db_dir_in_hdfs + '/' + table_name )
cmd_exec = ' echo "drop table '+table_name+';" | ' + BIN_HIVE
if DEBUG_MODE :
print '$ ' + cmd_exec
else:
os.system( cmd_exec )
create_hive_external_table = ' ROW FORMAT DELIMITED FIELDS TERMINATED BY \'' + db_field_delimiter + '\''
create_hive_external_table += ' STORED AS '
create_hive_external_table += ' INPUTFORMAT \'edu.yale.cs.hadoopdb.sms.connector.SMSInputFormat\' '
create_hive_external_table += ' OUTPUTFORMAT \'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\' '
create_hive_external_table += ' LOCATION \'' + hive_db_dir_in_hdfs + '/' + table_name + '\'; '
DB_TABLE_CREATE_INFO = DB_TABLE_CREATE_INFO.replace( ";" , ' '
).replace( 'precision' , '' ).replace( 'create table' , 'create
external table' )
DB_TABLE_CREATE_INFO = re.sub( 'varchar\([\d]+\)|text' , 'string' , DB_TABLE_CREATE_INFO )
create_hive_external_table = DB_TABLE_CREATE_INFO + create_hive_external_table
cmd_exec = ' echo "'+create_hive_external_table+'" | ' + BIN_HIVE
if DEBUG_MODE :
print '$ ' + cmd_exec
else:
os.system( cmd_exec )
def executeThreadForNode(node,DEBUG_MODE=True, *args ):
for cmd in cmd_for_node[node] :
if cmd == None or cmd == '' :
continue;
cmd = cmd.strip()
if cmd == '' :
continue;
cmd = cmd.replace( '"' , '\\"' )
cmd_exec = "ssh %s \"%s\"" % (node , cmd )
print "\t" , cmd_exec
if DEBUG_MODE == False :
os.system( cmd_exec )
completed[node] = "true"
def main( argv=None ):
parser = OptionParser()
parser.add_option( "-H" , "--source_dir_in_hdfs" ,
dest="source_dir_in_hdfs" , default=None, help="dir for data source in
HDFS" )
parser.add_option( "-D" , "--source_data_delimiter" ,
dest="source_data_delimiter" , default='\n' , help="record delimtier
for the source" )
parser.add_option( "-F" ,
"--source_field_delimiter" , dest="source_field_delimiter" ,
default='\t' , help="field delimiter for a record" )
parser.add_option( "-P" , "--source_partition_dir" ,
dest="source_partition_dir" , default="tmp_out_hadoopdb" , help="temp
dir in HDFS for source partition" )
parser.add_option( "-N"
, "--node_list_file" , dest="node_list_file" , default="nodes.txt" ,
help="path for a file saved each node's IP address" )
parser.add_option( "-c" , "--chunk_num" , dest="chunk_num" , default=0 , help="number of databases for each node" )
parser.add_option( "-t" , "--table_name" , dest="table_name" ,
default="hadoopdb" , help="table name for creation on Hive and
databases" )
parser.add_option( "-i" , "--table_field_info_file"
, dest="table_field_info_file" , default="table_create", help="file for
table field definition only" )
parser.add_option( "-u" ,
"--db_username" , dest="db_username" , default="hadoop" ,
help="username for login the databases on each node" )
parser.add_option( "-p" , "--db_password" , dest="db_password" ,
default="1234" , help="password for login the databases on each node" )
parser.add_option( "-d" , "--db_field_delimiter" ,
dest="db_field_delimiter" , default="|" , help="field delimiter for the
databases" )
parser.add_option( "-w" , "--hive_db_dir" ,
dest="hive_db_dir" , default='/db' , help="path in HDFS for Hive to
save the tables" )
parser.add_option( "-f" , "--catalog_properties"
, dest="catalog_properties" , default='/tmp/Catalog.properties' ,
help="output file for Catalog.Properties" )
parser.add_option(
"-x" , "--hadoopdb_xml" , dest="hadoopdb_xml" , default="HadoopDB.xml"
, help="output file for HadoopDB.xml" )
parser.add_option( "-y"
, "--hadoopdb_xml_in_hdfs" , dest="hadoopdb_xml_in_hdfs" ,
default="HadoopDB.xml" , help="filename for HadoopDB.xml in HDFS" )
parser.add_option( "-g" , "--go" , action="store_false" , dest="mode"
, default=True , help="set it to execute the commands" )
( options, args ) = parser.parse_args()
#print options
#return
#initHadoopDB( data_in_hdfs='src' , data_partition_out='tmp_out' ,
table_name='justtest' , table_field_info='table_create' )
if options.source_dir_in_hdfs is None :
print "Please input the source dir in HDFS by '--source_dir_in_hdfs' "
return
if os.path.isfile( options.node_list_file ) is False :
print "Please check the '" + options.node_list_file + "' path and setup by '--node_list_file'"
if options.mode is True :
print "\n Current Status is just Debug Mode for show all
commands\n please set '-g' or '--go' option to execute them after check
all commands.(look at the 'rm -rf' and 'hadoop fs -rmr')\n"
initHadoopDB( data_in_hdfs = options.source_dir_in_hdfs,
data_delimiter = options.source_data_delimiter, data_field_delimiter =
options.source_field_delimiter,data_partition_out =
options.source_partition_dir, nodes_in_file = options.node_list_file,
chunks_per_node = options.chunk_num, table_name = options.table_name,
table_field_info = options.table_field_info_file, db_user =
options.db_username, db_pass = options.db_password, db_field_delimiter
= options.db_field_delimiter, hive_db_dir_in_hdfs =
options.hive_db_dir, tmp_path_for_catelog = options.catalog_properties,
out_hadoop_xml = options.hadoopdb_xml, hadoop_xml_in_hdfs =
options.hadoopdb_xml_in_hdfs, DEBUG_MODE = options.mode )
print "\n\n=> All Execution Completed..."
if __name__ == "__main__":
main()
另外有一些常見問題也順便紀錄:
hadoop@Cluster01:~$ echo "drop table justest;" | /home/hadoop/SMS_dist/bin/hive
Hive history file=/tmp/hadoop/hive_job_log_hadoop_201001081445_409015456.txt
hive> drop table justest;
FAILED:
Error in metadata: javax.jdo.JDOFatalDataStoreException: Failed to
start database 'metastore_db', see the next exception for details.
NestedThrowables:
java.sql.SQLException: Failed to start database 'metastore_db', see the next exception for details.
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask
hive>
這類訊息是因為同時有兩個 client 在用 hive,同一時間只能有一個 client 在操作。
hadoop@Cluster01:~$ sudo vim /etc/postgresql/8.4/main/pg_hba.conf
local all all trust
# IPv4 local connections:
#host all all 127.0.0.1/32 md5
host all all 127.0.0.1/32 password
host all all 10.0.0.1/8 password # 加上Cluster機器上的範圍
# IPv6 local connections:
#host all all ::1/128 md5
host all all ::1/128 password
hadoop@Cluster01:~$ sudo su - postgres
postgres@Cluster01:~$ createuser hadoop
Type "help" for help.
postgres=# alter user hadoop with password '1234';
ALTER ROLE
postgres=# \q
postgres@Cluster01:~$ exit
Hive history file=/tmp/hadoop/hive_job_log_hadoop_201001081445_409015456.txt
回覆刪除有沒有甚解決的辦法
版主回覆:(05/15/2010 01:42:22 AM)
sorry, 有一陣子沒玩了, 我看不懂你的問題 Orz
hadoop@Cluster01:~ $ SMS_dist/bin/hive
回覆刪除###########################
輸入以上指令後會出現者個訊息
Hive history file=/tmp/hadoop/hive_job_log_hadoop_201005271048_1660712084.txt
Hive>
請問這個訊息是有錯誤嗎? 該怎麼解決?
###########################
CREATE EXTERNAL TABLE raw (
ID int,
NAME string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS
INPUTFORMAT 'edu.yale.cs.hadoopdb.sms.connector.SMSInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/db/raw';
版主回覆:(05/27/2010 02:00:51 AM)
哦哦, 應該沒問題吧, 這應該只是一則訊息, 類似將接下來的工作指令會被記錄在某個檔案內, 出錯應該是會顯示 "FAILED:" 開頭或其他類似的字樣, 不過我也很久沒玩了, 手邊以沒環境可以跑, 所以無法真的幫你確認囉
最後一提的, 這個 HadoopDB 是跟 Hive 整在一起使用, 所以有問題可能很難去 debug, 通常會稍微先搞懂 Hive 到底在做啥, 它只是提供一個架構幫你把 SQL 指令轉成 Hadoop 的 Jobs 去跑, 而 HadoopDB 其實只是用用 Hive 的架構, 設法把讀取底部資料的地方改成用 database 囉
因此, 若設定沒弄好, 很容易會出現問題, 建議你可以先試試 Hive 來跑幾個 jobs, 最後再套上 HadoopDB 玩玩囉
祝你使用上一切順利啦
資料匯入
回覆刪除* hadoop@Cluster01:~$ psql udb0
udb0=# COPY RAW FROM '/home/hadoop/p0' WITH DELIMITER E'\t' ;
* 關於 /home/hadoop/p0 的資料主要從原本依開始的大檔案,使用 HadoopDB 所提供的切割工具處理的
o $ hadoop jar lib/hadoopdb.jar edu.yale.cs.hadoopdb.dataloader.GlobalHasher src_in_hdfs out_in_hdfs 3 '\n' 0
o $ hadoop fs -get out_in_hdfs/part-00000 /home/hadoop/p0
* 假設資料擺在 /home/haddop/p0 並且欄位以 tab 分隔
* 同理也要處理 Cluster02 跟 Cluster03
###################
src_in_hdfs 這個目錄下面有資料嗎?
我用的版本是hadoop-0.20.1 所以這邊不太清楚
src_in_hdfs 我是自己-mkdir src_in_hdfs
切出來之後裡面的資料有
out_in_hdfs/part-0000
out_in_hdfs/part-0001
out_in_hdfs/part-0002
裡面都是空的
是不是版本不同的關係?
版主回覆:(05/27/2010 03:08:07 AM)
當初我用 Hadoop 0.19.2 的關係, 主要也是因為 HadoopDB 是用這個版本.
而 Hadoop 0.20.x 跟 Hadoop 0.19.x 裡頭的確有些程式架構有變, 所以有可能 HadoopDB 裡頭寫的程式不能在 0.20.1 上頭正常運作, 因此, 也有可能是版本的問題, 你可以去留意看看 HadoopDB 有沒再更新, 不然我是比較建議用 Hadoop 0.19.x 囉!
不過這樣還會有很多問題, 若是以 Hadoop 的使用上, 比較建議是用新版的來開發, 因為 0.19.x 跟 0.20.x 有所變動, 一直用 0.19.x 會跟新版開發越離越遠 orz