Tuesday, January 22, 2013

Writes MVCC in HBase

In a blog of HBase community, author (Gregory Chanan) describes how MVCC works in HBase, especially in read side. Here is the figure he used to demonstrate the idea.


Fig. 1  Write Steps with MVCC

In this figure, each write operation needs to obtain a RowLock before processing its real WAL writes and Memstore writes. This sequential behavior guaranteed by Lock is time consuming and in my opinion not necessary. We can guarantee that all the writes will not overlap each other by given each write a local sequential number: seqn

1) If threads try to update a cell, and found current write seqn number is less than cell's largest "finished seqn" number, the update will be discard, otherwise the write operation will be performed on WAL and Memstore. When all cells that a write operation needs to modify has been applied or discarded, its seqn number will be declared finished.

2) Each read operation is assigned a read timestamp which is the highest value that all writes with write number <= x have been completed. Then, the read for a certain (row, column) will return data cell whose write number is the largest value that is less than or equal to the read timestamp.

Figure 2 shows a typical Lock-Free MVCC writes situation. There are two write operations with seqn=1 and seqn=2. The first write operation updates cell-1 and cell-3; the second write operation update cell-2 and cell-3. Besides these two writes, there are two reads: read-1 begin just after the first write success and before the second write finished. Read-2 begins after the second write finished. 

Fig. 2  MVCC Writes Use Cases.

From Fig. 2, we can conclude that all the cell will be updated by the write operations with higher seqn number, and read operations are guaranteed to get the value that is updated by finished writes. However, For simplicity  we do not show the concurrent writes. 

Fig. 3 shows two write operations with seqn1 and seqn2. The first write updates three cells (1, 2, 3) and the second write updates two cells (2, 3). When read operation with sequence number 1 begin, the first write has finished, and the second writes is still working. Then this read gets all the cell values that were written by sequence number 1 without any partial updates.


Fig. 3  MVCC Concurrent Writes

The MVCC writes strategy will improve HBase writes in two aspects: 1) It allows different writes to execute parallel; 2) It allows us to discard part of the write operations whose sequence number is smaller than current finished seqn.

Tuesday, January 15, 2013

HBase 0.95 Source Code Analyze


Just as the description of HBase homepage: Apache HBase is an open-source, distributed, versioned, column-oriented store modeled after Google's Bigtable: A Distributed Storage System for Structured Data by Change et al. Just as Bigtable leverages the distributed data storage provided by the Google File System. Apache HBase provides Bigtable-like capabilities on the top of Hadoop and HDFS.

HBase 0.95 is the most recent version, In this post, we will record the source code analysis in several aspects: 1) how client connects to HBase; 2) how servers inside HBase connect each other; 3) how data was written or gotten from HBase; 4) Finally, the important APIs and their implementations. To make this article meaningful to guys who are not currently reading the source, i will try my best do not list too much code, instead, i will describe how it works and why. Hope this post would be helpful for anyone interested in HBase.


SECTION 1. How Clients Connect to HBase

When programmers use Java clients, the way clients connect to HBase is the simplest. Below shows an example:

1:  HTable htable = ...   // instantiate HTable  
2:  Get get = new Get(Bytes.toBytes("row1"));  
3:  Result r = htable.get(get);  
4:  Put put = new Put(Bytes.toBytes(row));  
5:  put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), Bytes.toBytes( data));     
6:  htable.put(put);  
To the HBase source, first step is instantiating HTable instance. To instantiate a HTable instance, you should at least provide the "table name", and a "configuration" instance, then we will call: 
public HTable(Configuration conf, final byte [] tableName)  
One of the most important tasks in this function is to get a "HConnection" instance, and to create a ThreadPoolExecutor. All "HConnection"s are managed by HConnectionManager. There is a inner class in HConnectionManager named HConnectionKey which denotes all the parameters set by the configuration file in a HBase instance. And according to this HConnectionKey, HBase will choose different HConnectionImplementation. This strategy means HBase support reuse connections if the HTable was came from the same users and with the same configurations. So, the real connection from users clients to HBase servers is  implemented through HConnectionManager.HConnectionImplementation. In the constructor of HConnectionImplementation, we set all the private variables, like conf, managed, pause, numRetries, maxRPCAttempts, rpcTimeout, and cluster Id. Besides these statics, two class instances were created for future use: adminClass and clientClass. These two variables are created through reflection of Java like this:
1:  String adminClassName = conf.get(REGION_PROTOCOL_CLASS,  
2:      DEFAULT_ADMIN_PROTOCOL_CLASS);  
3:  this.adminClass =  
4:       (Class<? extends AdminProtocol>) Class.forName(adminClassName);  
5:  String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,  
6:      DEFAULT_CLIENT_PROTOCOL_CLASS);  
7:  this.clientClass =  
8:       (Class<? extends ClientProtocol>) Class.forName(clientClassName);  
When applications do use a HConnection, HConnectionImplementation takes charge of return a RPC Protocol instance according to the requests (for admin or clients). So there is a "getProtocol" method in HConnectionImplementation. RPC implementation is the core conception to understand how clients connect to HBase, so in next section, we describe it in detail.

1.1 RPC and Protocol in HBase

RPC in HBase works as Hadoop: they does not use the build-in java RPC mechanism, instead, they both use a similar approach: implementing their own RPC mechanism. The difference is HBase also combine Protobuf as the serialization component.

All the RPC relevant source files are located in ./org/apache/hadoop/hbase/ipc folder. And all the protocol relevant source files includes the .proto file and generated *Proto files are located in hadoop-protocol folders.

Before talking how HBase implement RPC and combine it with Protobuf, we firstly introduce the build-in dynamic proxy mechanism in Java. It works like the Proxy Design Pattern except using Proxy and Invoker class makes the proxy pattern become automatic in Java. 
Fig.1 Java Dynamic Proxy

As Fig.1 shows, the proxy object was generated by calling Proxy.newProxyInstance method instead of written by hand, and the method call will be automatically transfered to the real object using InvokerHandler's invoke method.

In HBase,  RPC series classes were mapped as Fig.2 shows:
Fig.2 HBase RPC protocol dynamic proxy mechanism

HBase uses lots of concurrent methods to accelerate the performance of client reads/writes. To connect to the a RegionServer of HBase, we need a "ClientProtocol" instance, which is generated by call HConnectionImplementation's getClient method. Start form here, there would be a serial call stack that combines all the RPC and ProtoBuf together:

HConnectionImplementation.getClient -> getProtocol -> HBaseClientRPC.waitForProxy -> HBaseClientRPC.getProxy -> RpcClientEngine.getProxy -> ProtobufRpcClientEngine.getProxy

Here, the ProtobufRpcClientEngine.getProxy() method will generate a proxy instance that is the proxy of the core 'ClientProtocol' instance came from the "HCI.getClient()" method. The "invoke" function in ProtobufRpcClientEngine's "Invoker" static class will do the RPC client stuffs:
1:  public Object invoke(Object proxy, Method method, Object[] args)  
2:      throws ServiceException {  
3:     long startTime = 0;  
4:     if (LOG.isDebugEnabled()) {  
5:      startTime = System.currentTimeMillis();  
6:     }  
7:     RpcRequestBody rpcRequest = constructRpcRequest(method, args);  
8:     Message val = null;  
9:     try {  
10:      val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);  
11:      if (LOG.isDebugEnabled()) {  
12:       long callTime = System.currentTimeMillis() - startTime;  
13:       if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);  
14:      }  
15:      return val;  
16:     } catch (Throwable e) {  
17:      if (e instanceof RemoteException) {  
18:       Throwable cause = ((RemoteException)e).unwrapRemoteException();  
19:       throw new ServiceException(cause);  
20:      }  
21:      throw new ServiceException(e);  
22:     }  
23:    }  
It constructs the RPC requestes and calls client.call() to do the real communications. Here, HBase uses Google's protobuf to construct the rpc requests: a RpcRequestBody. Then, the "HBaseClient" class takes charge of all the rpc requests and return a Message object as the results.

Till then, all the client code has finished, the method call in the ClientProtocol will be collected and reconstructed by Protobuf and sent to RegionServer. SECTION 1 finished.



SECTION 2. How Servers inside HBase Connect to Each other

There are two different types of roles in HBase: HMaster, HRegion. HMaster was elected and stared since then, HRegion runs on each server. The start point of HMaster is inside the HMaster.java file, which includes the static main method. All the main functions in HBase are delegated to an abstract ServerCommandLine class and its doMain method. Inside the doMain method, HBase call ToolRunner.run(conf, this, arg) to do the real JVM start job. The ToolRunner class was defined in Hadoop Core, so we won't describe it here. The only thing we need to know is once the ToolRunner.run began, it will call "this.run" method. To our HBase case, we will run HMasterCommandLine.run method, and finally call the "startMaster" method.

After parsing the arguments, things rollbacks to HBase again. 1) call HMaster.constructMaster() to build a HBase instance; 2) call the start method of this instance; 3) call join method of this instance; 4) Check some errors.

In constructMaster(), HBase uses reflection mechanism to build a HBase Class instance. In fact, i am still curious why construct a new instance this way? Is it faster? or more flexible?

As HBase is a subclass of abstract class: HasThread, so it automatically is Runnable. the start method means HMaster thread begin to work, and join means the JVM won't exit until HMaster exits. The way HMaster starts is interesting: all the servers would start a HMaster instance when users try to bootup a cluster, and these servers will race with others to write something on ZooKeeper. As ZooKeeper can guarantee, there would be only one success HMaster. All the others will block to wait until the success one fails and re-race again; The success one will initialization itself and begin an infinit loop to provide service.

So, since then, HMaster has started, and all other RegionServers will connect to it.

As we have described before, all servers will run HRegionServer (HRS for short) on itself too.  HRS starts using the exact same way like HMaster: In the main function, call the HRegionServerCommandLine.doMain() method. In the run method of HRS, we firstly try and register with the HMaster, tell it we are here. After registering with HMaster, HRS will go into an infinit loop. Inside the loop, HRS takes care of the region stop stuff, and report to HMaster every msgInterval seconds. There is a heartbeat report implemented by function: tryRegionServerReport(lastMsg, now). Until now, HRS started. We can communicate inside the HBase now.



2.1 RPC Server Start in HBase

We know that there are different RPC protocols in HBase, and there are only two servers: HMaster and HRegionServer running in an HBase instance. So, each server worked as a RPC server for different protocols.

HMaster implements: MasterMonitorProtocol, MasterAdminProtocol, RegionServerStatusProtocol
HRS implements: ClientProtocol, AdminProtocol

Though the protocols they implemented are quite different, the RPC server start procedures are quite similar. For example, in HRS, the RPC server was initialized in its Constructor method as fellow:
 this.rpcServer = HBaseServerRPC.getServer(AdminProtocol.class, this,  
     new Class<?>[]{ClientProtocol.class,  
       AdminProtocol.class, HBaseRPCErrorHandler.class,  
       OnlineRegions.class},  
     initialIsa.getHostName(), // BindAddress is IP we got for this server.  
     initialIsa.getPort(),  
     conf.getInt("hbase.regionserver.handler.count", 10),  
     conf.getInt("hbase.regionserver.metahandler.count", 10),  
     conf.getBoolean("hbase.rpc.verbose", false),  
     conf, HConstants.QOS_THRESHOLD);  
In HMaster, the RPC server was initialized in the exactly same place with similar code:
 this.rpcServer = HBaseServerRPC.getServer(MasterMonitorProtocol.class, this,  
     new Class<?>[]{MasterMonitorProtocol.class,  
       MasterAdminProtocol.class, RegionServerStatusProtocol.class},  
     initialIsa.getHostName(), // BindAddress is IP we got for this server.  
     initialIsa.getPort(),  
     numHandlers,  
     0, // we dont use high priority handlers in master  
     conf.getBoolean("hbase.rpc.verbose", false), conf,  
     0); // this is a DNC w/o high priority handlers  
So, all these RPC servers startup is handled by HBaseServerRPC class.

How HBaseServerRPC starts the RPC server? It is identical with HBaseClientRPC starts, as following figure shows.
Figure 3. How RPC Server starts in HBase

After these servers start, the communication inside the HBase cluster would be very clear:
Figure 4. How Servers communication inside HBase



... to be continue.



REFERENCE
----------------------
[1] Java Proxy, http://docs.oracle.com/javase/1.4.2/docs/api/java/lang/reflect/Proxy.html
[2] HBase Project, http://hbase.apache.org 


Friday, January 11, 2013

A Strange Java Multi-Thread Case

Today, we have a nice problem about Java multi-thread and other java related tips. I want to write it down to remind myself that keep curious about whatever strange thing you met because it may open a brand new window for you. Ok, first of all, here is the java code:


class Daemon extends Thread{
private static final int SIZE = 100;
private Thread[] t= new Thread[SIZE];
private static volatile int i = 0;

public Daemon(){
   start();
}
public void run(){
   for( i = 0; i < SIZE; i++){
t[i] = new DaemonSpawn(i);
//t[i].start();  [1]
   }
while(true)
yield();
}
}

class DaemonSpawn extends Thread{
private int n ;
public DaemonSpawn(int i){
this.n = i;
start();
}
public void run(){
System.out.println("DaemonSpawn" + n + "started");
while(true)
yield();
}
}

public class Daemons {
public static void main(String...args){
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Daemon());
//Thread d = new Daemon();  [2]
//System.out.println("d.isDaemon() = " + d.isDaemon());

}
}

Ok, Just as it is, we have a class named Daemons, it will new a ExecutorService and run its execute method on the Daemon class. The Daemon class constructs a serial of DaemonSpawn threads and run them. It should be an easy java code, and I guess the result should be a serial of unsorted print statements from 0 to 100?

You can not be more wrong.

The result is a serial of unsorted print statements, but there are replications.

Why?

1, The first lesson i should learn is "NEVER WRITE A START METHOD IN THE THREAD OR RUNNABLE CONSTRUCTOR". The reason is when you call start() in a thread's constructor, in fact, most of the constructor has not finished yet, and the thread starts already because of your start() method. So wired things would happen, and you never image what these things could be.

2, The second tips, The ExecutorService will make existed thread two times due to the 'exec' instance will initialize the Thread instance again by calling constructor twice. This means Daemon thread will start again, so you will see the replicated print statements.

If you use any [1] or [2] part instead of the existed codes, you would get the right results.


REFERENCE:
----------------------
[1] http://stackoverflow.com/questions/5623285/java-why-not-to-start-a-thread-in-the-constructor-how-to-terminate

Wednesday, December 5, 2012

Web Data Mining

Web数据挖掘, 刘兵(UIC)


Chapter 1: Association Rule and Sequence Pattern

关联规则是从统计的意义来调查数据中蕴含的规律的方法,它用两个变量来描述一组数据的规律:支持度(Support)和置信度(Confidence)。给出一组事务(Transaction)数据,通常是客户的购买记录,点击记录等数据,我们希望找出具备较明显的关联的一些项目(Item)。我们定义规则'X->Y'为一个关联规则(Association Rule),假设其支持度为10%,置信度为80%,就表明,所有数据中,X和Y同时出现的概率为10%,而X出现的情况下,Y出现的情况为80%。

通常我们使用Apriori算法来计算给定的一组事务,一个最低支持度阈值(频繁项目集),最低置信度阈值(可信关联规则)时,所有的关联规则。


Apriori算法包括了两个步骤:1)求出所有的频繁项目集;2)从频繁项目集中找出可信关联规则。

频繁项目集有一个很好的特性,任一个包含k个项目的频繁项目集中任何一个非空子集都满足频繁项目集的条件。我们从只含1个项目的平凡集开始,根据k-1个项目的平凡集构造含k个项目的平凡集。构造的方法有个技巧,首先选取两个k频繁项目集f1f2,其前k-1个元素相等,第k个元素f1[k]<f2[k],那么构造的k+1频繁项目集的一个元素就为前k-1个元素,f1[k], f2[k],共计k+1个项目。

这一算法的正确性很容易证明:根据频繁项目集的特点,k+1个元素的频繁项目集的任意的k元素子集都一定是频繁项目集,并且我们已经求出,那么显然{a1, a2, a3, … ak-1, ak}{a1, a2, a3, … ak-1, ak+1}都在这个k频繁项目集中,那么聚合他们一定能得到k+1个元素的频繁项目集。

而关联规则的生成则更加简单,在有了频繁项目集的基础之上,穷举f - Y->Y中的Y并且求置信度即可。这里面还有一个技巧,如果f-Y->Y满足置信度要求,那么所有的f-Ysub->Ysub都满足置信度要求。因为该公式的置信度计算公式为f.count / (f – Ysub).count

然而在实际应用中,单最小支持度的限制还是使得该算法的实用性不佳,特别是对那些本来就比较稀有的项目。因此引入了多最小支持度的关联规则


Talking about trigger with HBase


We have been pretty familiar with triggers in traditional SQL databases. Database administrators use triggers to keep data integrity or finish some useful actions. Most of these triggers are implemented based on the ECA model (event-condition-action). Today's HBase has not provided this features which i believe is critical for lots of applications, so in next few articles i would like to talk about how i designed and implemented this feature based on current HBase release(0.94.2).

1. Our goal

As co-processor has been introduced in HBase since 0.92, it makes no sense to give HBase a new tool for keeping data integrity or finishing some pre-, post-actions. In fact, what we are doing here is a general framework that allows programmers to write applications which can monitor some fields of HBase table, check whether conditions are fulfilled, and finally run user defined functions. So basically, we need a trigger-plused HBase that:
  • allows programmers to submit 'TRIGGERS'
  • supports "ACTIONS" to be executed distributed automatically
There are lots of difficulties here:
  • we needs to guarantee that when programmers got the successful submission return value, the "TRIGGERS" must have began to work.
  • we need to make sure all the relevant servers should contain the "TRIGGER" information. Parts are not allowed.
  • we need to provide an efficient way to make sure that trigger event can be detected quickly with small 'PUT' performance degradation.
  • user-defined functions are not restricted to some types, so we must keep eyes on failures, inconsistency, and dis-order.
  • we need to provide good load-balance algorithm to redistribute "TRIGGERS" according to both the disk load and cpu load.
  • we need to have a simple failure-tolerant approach to make whole system workable in practical environment.
2. The Architecture

Just like the fellowing figure shows, Triggers are submitted from users to the TriggerMaster node, which is also the HMaster node in HBase implementation. The TriggerMaster will distribute the trigger structure to other TriggerWorkers, which takes charge of the trigger deployment and execution.


This architecture is quite simple, but it will be much easier to do schedule or fault-tolerance stuffs. Also, the TriggerMaster was selected from all the nodes in the data center through a ZooKeeper like service, so even this single node fails, the system can recovery by select a new TriggerMaster again.



2.1 The TriggerMaster

Trigger 
2.2 The TriggerWorker
2.3 The Communication

3. Fault-Tolerance
4. Scheduler
5. Use Cases
6. Analysis