Monday, January 28, 2013

Some Need to be Read Papers in Cloud Computing Framework Recently




In this blog, i will continuously update a paper list which contains some of the famous papers about cloud computing, cloud storage and other relevant stuffs in last 5~6 years. I thought every new researcher in this area should at least have a glance over them.

Any comment or suggestion are welcome!




  • Dean, Jeffrey, and Sanjay Ghemawat. "MapReduce: simplified data processing on large clusters." Communications of the ACM 51.1 (2008): 107-113. LINK
  • Ghemawat, Sanjay, Howard Gobioff, and Shun-Tak Leung. "The Google file system." ACM SIGOPS Operating Systems Review. Vol. 37. No. 5. ACM, 2003. LINK
  • Chang, Fay, et al. "Bigtable: A distributed storage system for structured data."ACM Transactions on Computer Systems (TOCS) 26.2 (2008): 4.  LINK
  • Yang, Hung-chih, et al. "Map-reduce-merge: simplified relational data processing on large clusters." Proceedings of the 2007 ACM SIGMOD international conference on Management of data. ACM, 2007. LINK
  • Bu, Yingyi, et al. "HaLoop: Efficient iterative data processing on large clusters." Proceedings of the VLDB Endowment 3.1-2 (2010): 285-296. LINK
  • Borthakur, Dhruba, et al. "Apache Hadoop goes realtime at Facebook."Proceedings of the 2011 international conference on Management of data. ACM, 2011. LINK
  • Burrows, Mike. "The Chubby lock service for loosely-coupled distributed systems." Proceedings of the 7th symposium on Operating systems design and implementation. USENIX Association, 2006. LINK
  • DeCandia, G., Hastorun, D., Jampani, M., Kakulapati, G., Lakshman, A., Pilchin, A., ... & Vogels, W. (2007, October). Dynamo: amazon's highly available key-value store. In ACM SIGOPS Operating Systems Review (Vol. 41, No. 6, pp. 205-220). ACM. LINK
  • Neumeyer, L., Robbins, B., Nair, A., & Kesari, A. (2010, December). S4: Distributed stream computing platform. In Data Mining Workshops (ICDMW), 2010 IEEE International Conference on (pp. 170-177). IEEE. LINK
  • McKusick, Marshall Kirk, and Sean Quinlan. "Gfs: Evolution on fast-forward."ACM Queue 7.7 (2009): 10-20. LINK
  • Weil, Sage A., et al. "Ceph: A scalable, high-performance distributed file system." Proceedings of the 7th Symposium on Operating Systems Design and Implementation (OSDI). 2006. LINK
  • Lakshman, Avinash, and Prashant Malik. "Cassandra—A decentralized structured storage system." Operating systems review 44.2 (2010): 35. LINK
  • Baker, J., Bond, C., Corbett, J. C., Furman, J. J., Khorlin, A., Larson, J., ... & Yushprakh, V. (2011, January). Megastore: Providing scalable, highly available storage for interactive services. In Proc. of CIDR (pp. 223-234). LINK
  • Ousterhout, John, Parag Agrawal, David Erickson, Christos Kozyrakis, Jacob Leverich, David Mazières, Subhasish Mitra et al. "The case for ramcloud."Communications of the ACM 54, no. 7 (2011): 121-130. LINK
  • Ongaro, Diego, Stephen M. Rumble, Ryan Stutsman, John Ousterhout, and Mendel Rosenblum. "Fast crash recovery in RAMCloud." In Proceedings of the Twenty-Third ACM Symposium on Operating Systems Principles, pp. 29-41. ACM, 2011. LINK
  • Ekanayake, Jaliya, et al. "Twister: a runtime for iterative mapreduce."Proceedings of the 19th ACM International Symposium on High Performance Distributed Computing. ACM, 2010. LINK
  • Zhang, Yanfeng, et al. "Priter: a distributed framework for prioritized iterative computations." Proceedings of the 2nd ACM Symposium on Cloud Computing. ACM, 2011. LINK
  • Peng, Daniel, and Frank Dabek. "Large-scale incremental processing using distributed transactions and notifications." Proceedings of the 9th USENIX conference on Operating systems design and implementation. USENIX Association, 2010. LINK
  • Bhatotia, Pramod, et al. "Incoop: MapReduce for incremental computations."Proceedings of the 2nd ACM Symposium on Cloud Computing. ACM, 2011. LINK
  • Zaharia, M.; Borthakur, D.; Sen Sarma, J.; Elmeleegy, K.; Shenker, S. & Stoica, I. "Delay scheduling: a simple technique for achieving locality and fairness in cluster scheduling" Proceedings of the 5th European conference on Computer systems, 2010, 265-278.  LINK
  • Zaharia, M.; Chowdhury, M.; Franklin, M.; Shenker, S. & Stoica, I. "Spark: cluster computing with working sets" Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, 2010, 10-10   LINK
  • Zaharia, M.; Konwinski, A.; Joseph, A.; Katz, R. & Stoica, I. "Improving mapreduce performance in heterogeneous environments" Proceedings of the 8th USENIX conference on Operating systems design and implementation, 2008, 29-42. LINK
  • Armbrust, M.; Fox, A.; Griffith, R.; Joseph, A.; Katz, R.; Konwinski, A.; Lee, G.; Patterson, D.; Rabkin, A.; Stoica, I. & others "A view of cloud computing" Communications of the ACM, ACM, 2010, 53, 50-58. LINK
  • Lloyd, Wyatt, et al. "Don't settle for eventual: scalable causal consistency for wide-area storage with COPS." Proceedings of the Twenty-Third ACM Symposium on Operating Systems Principles. ACM, 2011. LINK
  • Low, Yucheng, et al. "Distributed GraphLab: a framework for machine learning and data mining in the cloud." Proceedings of the VLDB Endowment 5.8 (2012): 716-727. LINK
  • Mitchell, Christopher, Russell Power, and Jinyang Li. "Oolong: Programming Asynchronous Distributed Applications with Triggers." Proc. SOSP. 2011. LINK
  • Mitchell, Christopher, Russell Power, and Jinyang Li. "Oolong: asynchronous distributed applications made easy." Proceedings of the Asia-Pacific Workshop on Systems. ACM, 2012. LINK
  • Power, Russell, and Jinyang Li. "Piccolo: building fast, distributed programs with partitioned tables." Proceedings of the 9th USENIX conference on Operating systems design and implementation. USENIX Association, 2010. LINK
  • Harter, Tyler, et al. "A file is not a file: understanding the I/O behavior of Apple desktop applications." Proceedings of the Twenty-Third ACM Symposium on Operating Systems Principles. ACM, 2011. LINK

Basic Theory

[Chubby] Burrows, Mike. "The Chubby lock service for loosely-coupled distributed systems." Proceedings of the 7th symposium on Operating systems design and implementation. USENIX Association, 2006. http://www.usenix.org/event/osdi06/tech/full_papers/burrows/burrows_html/

Armbrust, M.; Fox, A.; Griffith, R.; Joseph, A.; Katz, R.; Konwinski, A.; Lee, G.; Patterson, D.; Rabkin, A.; Stoica, I. & others "A view of cloud computing" Communications of the ACM, ACM, 2010, 53, 50-58. http://x-integrate.de/x-in-cms.nsf/id/DE_Von_Regenmachern_und_Wolkenbruechen_-_Impact_2009_Nachlese/$file/abovetheclouds.pdf

Programming Model

Abadi, D. J., Ahmad, Y., Balazinska, M., Cetintemel, U., Cherniack, M., Hwang, J. H., ... & Zdonik, S. (2005, January). The design of the borealis stream processing engine. CIDR. http://www.cs.harvard.edu/~mdw/course/cs260r/papers/borealis-cidr05.pdf

Dean, Jeffrey, and Sanjay Ghemawat. "MapReduce: simplified data processing on large clusters." Communications of the ACM 51.1 (2008): 107-113. http://fastandfuriousdecisiontree.googlecode.com/svn-history/r474/trunk/DIVERS/mapReduceByGoogle.pdf

Neumeyer, L., Robbins, B., Nair, A., & Kesari, A. (2010, December). S4: Distributed stream computing platform. In Data Mining Workshops (ICDMW), 2010 IEEE International Conference on (pp. 170-177). IEEE. http://www.4lunas.org/pub/2010-s4.pdf

Zaharia, M.; Chowdhury, M.; Franklin, M.; Shenker, S. & Stoica, I. "Spark: cluster computing with working sets" Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, 2010, 10-10   http://www.usenix.org/event/hotcloud10/tech/full_papers/Zaharia.pdf

Zaharia, Matei, et al. "Discretized streams: an efficient and fault-tolerant model for stream processing on large clusters." Proceedings of the 4th USENIX conference on Hot Topics in Cloud Ccomputing. USENIX Association, 2012. http://www.cs.berkeley.edu/~matei/papers/2012/hotcloud_spark_streaming.pdf

Gunda, Pradeep Kumar, et al. "Nectar: automatic management of data and computation in datacenters." Proceedings of the 9th USENIX conference on Operating systems design and implementation. USENIX Association, 2010. http://static.usenix.org/events/osdi10/tech/full_papers/Gunda.pdf

Peng, Daniel, and Frank Dabek. "Large-scale incremental processing using distributed transactions and notifications." Proceedings of the 9th USENIX conference on Operating systems design and implementation. USENIX Association, 2010. http://www.usenix.org/event/osdi10/tech/full_papers/Peng.pdf

Low, Yucheng, et al. "Distributed GraphLab: a framework for machine learning and data mining in the cloud." Proceedings of the VLDB Endowment 5.8 (2012): 716-727. http://arxiv.org/pdf/1204.6078

Mitchell, Christopher, Russell Power, and Jinyang Li. "Oolong: Programming Asynchronous Distributed Applications with Triggers." Proc. SOSP. 2011. http://sigops.org/sosp/sosp11/posters/summaries/sosp11-final6.pdf

Mitchell, Christopher, Russell Power, and Jinyang Li. "Oolong: asynchronous distributed applications made easy." Proceedings of the Asia-Pacific Workshop on Systems. ACM, 2012. https://apsys2012.kaist.ac.kr/media/papers/apsys2012-final28.pdf

Power, Russell, and Jinyang Li. "Piccolo: building fast, distributed programs with partitioned tables." Proceedings of the 9th USENIX conference on Operating systems design and implementation. USENIX Association, 2010. http://www.usenix.org/event/osdi10/tech/full_papers/Power.pdf

McSherry, Frank, et al. "Differential dataflow." Conference on Innovative Data Systems Research (CIDR). 2013. http://research.microsoft.com/pubs/176693/differentialdataflow.pdf

Improvement on Existed Models

Zaharia, M.; Konwinski, A.; Joseph, A.; Katz, R. & Stoica, I. "Improving mapreduce performance in heterogeneous environments" Proceedings of the 8th USENIX conference on Operating systems design and implementation, 2008, 29-42. http://www.usenix.org/event/osdi08/tech/full_papers/zaharia/zaharia_html/

Yang, Hung-chih, et al. "Map-reduce-merge: simplified relational data processing on large clusters." Proceedings of the 2007 ACM SIGMOD international conference on Management of data. ACM, 2007. http://www.cs.duke.edu/courses/cps399.28/current/papers/sigmod07-YangDasdanEtAl-map_reduce_merge.pdf

Bu, Yingyi, et al. "HaLoop: Efficient iterative data processing on large clusters." Proceedings of the VLDB Endowment 3.1-2 (2010): 285-296. http://vldb2010.org/proceedings/files/papers/R25.pdf

Borthakur, Dhruba, et al. "Apache Hadoop goes realtime at Facebook."Proceedings of the 2011 international conference on Management of data. ACM, 2011. http://oss.csie.fju.edu.tw/~tzu98/Apache%20Hadoop%20Goes%20Realtime%20at%20Facebook.pdf

Ekanayake, Jaliya, et al. "Twister: a runtime for iterative mapreduce."Proceedings of the 19th ACM International Symposium on High Performance Distributed Computing. ACM, 2010. http://www.iterativemapreduce.org/hpdc-camera-ready-submission.pdf

Zhang, Yanfeng, et al. "Priter: a distributed framework for prioritized iterative computations." Proceedings of the 2nd ACM Symposium on Cloud Computing. ACM, 2011. http://rio.ecs.umass.edu/mnilpub/papers/socc11-zhang.pdf

Bhatotia, Pramod, et al. "Incoop: MapReduce for incremental computations."Proceedings of the 2nd ACM Symposium on Cloud Computing. ACM, 2011. https://www.systems.ethz.ch/education/spring-2012/hotDMS/papers/incoop-socc11.pdf

Storage

Ghemawat, Sanjay, Howard Gobioff, and Shun-Tak Leung. "The Google file system." ACM SIGOPS Operating Systems Review. Vol. 37. No. 5. ACM, 2003. ftp://121.9.13.178/PPP/ppt-hadoop/The.Google.File.System.pdf

Chang, Fay, et al. "Bigtable: A distributed storage system for structured data."ACM Transactions on Computer Systems (TOCS) 26.2 (2008): 4. http://static.usenix.org/event/osdi06/tech/chang/chang_html/

DeCandia, G., Hastorun, D., Jampani, M., Kakulapati, G., Lakshman, A., Pilchin, A., ... & Vogels, W. (2007, October). Dynamo: amazon's highly available key-value store. In ACM SIGOPS Operating Systems Review (Vol. 41, No. 6, pp. 205-220). ACM. http://www.read.seas.harvard.edu/~kohler/class/cs239-w08/decandia07dynamo.pdf

McKusick, Marshall Kirk, and Sean Quinlan. "Gfs: Evolution on fast-forward."ACM Queue 7.7 (2009): 10-20. http://queue.acm.org/detail.cfm?id=1594206

Weil, Sage A., et al. "Ceph: A scalable, high-performance distributed file system." Proceedings of the 7th Symposium on Operating Systems Design and Implementation (OSDI). 2006. https://www.usenix.org/legacyurl/osdi-06-paper-4

Lakshman, Avinash, and Prashant Malik. "Cassandra—A decentralized structured storage system." Operating systems review 44.2 (2010): 35. http://www.cs.cornell.edu/Projects/ladis2009/papers/Lakshman-ladis2009.PDF

Baker, J., Bond, C., Corbett, J. C., Furman, J. J., Khorlin, A., Larson, J., ... & Yushprakh, V. (2011, January). Megastore: Providing scalable, highly available storage for interactive services. In Proc. of CIDR (pp. 223-234). http://pdos.csail.mit.edu/6.824-2012/papers/jbaker-megastore.pdf

Ousterhout, John, Parag Agrawal, David Erickson, Christos Kozyrakis, Jacob Leverich, David Mazières, Subhasish Mitra et al. "The case for ramcloud."Communications of the ACM 54, no. 7 (2011): 121-130. http://ilpubs.stanford.edu:8090/942/1/ramcloud.pdf

Ongaro, Diego, Stephen M. Rumble, Ryan Stutsman, John Ousterhout, and Mendel Rosenblum. "Fast crash recovery in RAMCloud." In Proceedings of the Twenty-Third ACM Symposium on Operating Systems Principles, pp. 29-41. ACM, 2011. http://www.cs.columbia.edu/~junfeng/11fa-e6121/papers/ramcloud-recovery.pdf

Lloyd, Wyatt, et al. "Don't settle for eventual: scalable causal consistency for wide-area storage with COPS." Proceedings of the Twenty-Third ACM Symposium on Operating Systems Principles. ACM, 2011. http://www-users.cselabs.umn.edu/classes/Fall-2012/csci8980-2/papers/cops.pdf

Stonebraker, M., Abadi, D. J., Batkin, A., Chen, X., Cherniack, M., Ferreira, M., ... & Zdonik, S. (2005, August). C-store: a column-oriented DBMS. In Proceedings of the 31st international conference on Very large data bases (pp. 553-564). VLDB Endowment. http://people.csail.mit.edu/tdanford/6830papers/stonebraker-cstore.pdf

Hall, A., Bachmann, O., Büssow, R., Gănceanu, S., & Nunkesser, M. (2012). Processing a trillion cells per mouse click. Proceedings of the VLDB Endowment, 5(11), 1436-1446. http://vldb.org/pvldb/vol5/p1436_alexanderhall_vldb2012.pdf

Abadi, Daniel J., Samuel R. Madden, and Nabil Hachem. "Column-Stores vs. Row-Stores: How different are they really?." Proceedings of the 2008 ACM SIGMOD international conference on Management of data. ACM, 2008. http://www.courses.fas.harvard.edu/~cs265/papers/abadi-2008.pdf

Scheduler

Zaharia, M.; Borthakur, D.; Sen Sarma, J.; Elmeleegy, K.; Shenker, S. & Stoica, I. "Delay scheduling: a simple technique for achieving locality and fairness in cluster scheduling" Proceedings of the 5th European conference on Computer systems, 2010, 265-278.  http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.212.1524&rep=rep1&type=pdf

Hindman, Benjamin, et al. "Mesos: A platform for fine-grained resource sharing in the data center." Proceedings of the 8th USENIX conference on Networked systems design and implementation. USENIX Association, 2011. http://incubator.apache.org/mesos/papers/nsdi_mesos.pdf

Ghodsi, A., Zaharia, M., Hindman, B., Konwinski, A., Shenker, S., & Stoica, I. (2011, March). Dominant resource fairness: fair allocation of multiple resource types. In USENIX NSDI. http://static.usenix.org/event/nsdi11/tech/full_papers/Ghodsi.pdf

Systems

Melnik, S., Gubarev, A., Long, J. J., Romer, G., Shivakumar, S., Tolton, M., & Vassilakis, T. (2010). Dremel: interactive analysis of web-scale datasets. Proceedings of the VLDB Endowment, 3(1-2), 330-339. http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en/us/pubs/archive/36632.pdf

Cluster ManagementApplications

Sunday, January 27, 2013

We shall fellow Prismatic

Prismatic is a news recommendation site created by several Berkeley students. Three of them are the famous Phd students in Berkeley: Aria Haghighi (work with Dan Klein on NPL, and also worked with Andrew Ng as undergraduate student in Stanford), Jenny Finkel (work with Chris on NPL) , and Jason Wolfe (work with Stuart Russell on AI Lab). Recently, it received an A round 15 million dollor from Jim Breyer and Yuri Milner to "Attack The Impossible Problem of Bringing You Relevant News"

In fact, before Prismatic, there were lots of tries to solve the complex problem: "What i shall read now". Including some not so successful works like Zita, Pulse, Flipboard, Digg, StumbleUpon, and Wavii. The core challenge of this problem is that it is still impossible for computers to understand the real content of a news or understand the needs of human being, besides, the timing is also tricky: what the readers want to read now is clearly less rhythmic than what the readers want to read eventually. 

Lots of people think that Prismatic is only another immature try on this impossible problem. However, in my mind, Prismatic has kind of grasp the core of this type of applications.

  1. Using social network. Ask computers to understand real human is not possible, but your friends are possible. Grasp enough materials from your social network is critical for solving this problem.
  2. Analyzing persons instead of contents. Persons have different kinds of tags: a geek, an artist, a student, or even a news report etc. Among all the geeks, there are still lots of subtypes. News and articles have different weights for different group of people. Besides, the interaction between your system and a person can future affect the system in a stable way.
  3. Strong academic background. Machine Learning and NLP are still hot research topics today. Phd. students in the best university will be more easy to deploy the obvious 'better' system.

We shall fellow Prismatic? 

Is that too hard for our ordinary programmers who do not obtain any ML or NLP doctor degree or do not study in the world's best laboratory or university. The answer is Yes, but there are other ways: "There are a simple version of Prismatic"

For example, I am a social network addictive person. Each day i spend at  least one hour (i call it lunch-social time) on different social network: Sina Weibo (a twitter like micro-blog system), Facebook, AcFun (an interesting video sites like youtube), and many others. Besides this hour, i do not like the social medias disrupt my work. So, in the first quater of the lunch-social hour, there are always interesting posts or news i like and read, but after around 15 minuts, i usually found there is not any new content, and i will spend next 45 minuts flushing the page again and again trying to get one interesting page (Of couse, all the posts i read during this lunch-social hour wound not be serious, they often are some jokes, break news, funny comic or popular short videos). How to help me? You need:


A Simple Version of Prismatic:

  1. Narrow down your contents only in funny stuff. "Funny comic, video, story", "Breaking news", "Social Trends" etc, other high quality blogs or posts can be left to Prismatic.  People usually do not criticize your system harshly  when given not so accurate recommendation while finding fun.
  2. Classify your users into pre-defined classes by human instead of unsupervised learning by computers. A heavy social network users can easily predict whether a post would be popular or not. Humans are really better than computers in such problems.

I would like to  try it any way not matter how hard it is because i want to use such tool for my launch-social time. :)


There are some reference materias about Prismatic. You can check it out if you are also interested in: 

Tuesday, January 22, 2013

Writes MVCC in HBase

In a blog of HBase community, author (Gregory Chanan) describes how MVCC works in HBase, especially in read side. Here is the figure he used to demonstrate the idea.


Fig. 1  Write Steps with MVCC

In this figure, each write operation needs to obtain a RowLock before processing its real WAL writes and Memstore writes. This sequential behavior guaranteed by Lock is time consuming and in my opinion not necessary. We can guarantee that all the writes will not overlap each other by given each write a local sequential number: seqn

1) If threads try to update a cell, and found current write seqn number is less than cell's largest "finished seqn" number, the update will be discard, otherwise the write operation will be performed on WAL and Memstore. When all cells that a write operation needs to modify has been applied or discarded, its seqn number will be declared finished.

2) Each read operation is assigned a read timestamp which is the highest value that all writes with write number <= x have been completed. Then, the read for a certain (row, column) will return data cell whose write number is the largest value that is less than or equal to the read timestamp.

Figure 2 shows a typical Lock-Free MVCC writes situation. There are two write operations with seqn=1 and seqn=2. The first write operation updates cell-1 and cell-3; the second write operation update cell-2 and cell-3. Besides these two writes, there are two reads: read-1 begin just after the first write success and before the second write finished. Read-2 begins after the second write finished. 

Fig. 2  MVCC Writes Use Cases.

From Fig. 2, we can conclude that all the cell will be updated by the write operations with higher seqn number, and read operations are guaranteed to get the value that is updated by finished writes. However, For simplicity  we do not show the concurrent writes. 

Fig. 3 shows two write operations with seqn1 and seqn2. The first write updates three cells (1, 2, 3) and the second write updates two cells (2, 3). When read operation with sequence number 1 begin, the first write has finished, and the second writes is still working. Then this read gets all the cell values that were written by sequence number 1 without any partial updates.


Fig. 3  MVCC Concurrent Writes

The MVCC writes strategy will improve HBase writes in two aspects: 1) It allows different writes to execute parallel; 2) It allows us to discard part of the write operations whose sequence number is smaller than current finished seqn.

Tuesday, January 15, 2013

HBase 0.95 Source Code Analyze


Just as the description of HBase homepage: Apache HBase is an open-source, distributed, versioned, column-oriented store modeled after Google's Bigtable: A Distributed Storage System for Structured Data by Change et al. Just as Bigtable leverages the distributed data storage provided by the Google File System. Apache HBase provides Bigtable-like capabilities on the top of Hadoop and HDFS.

HBase 0.95 is the most recent version, In this post, we will record the source code analysis in several aspects: 1) how client connects to HBase; 2) how servers inside HBase connect each other; 3) how data was written or gotten from HBase; 4) Finally, the important APIs and their implementations. To make this article meaningful to guys who are not currently reading the source, i will try my best do not list too much code, instead, i will describe how it works and why. Hope this post would be helpful for anyone interested in HBase.


SECTION 1. How Clients Connect to HBase

When programmers use Java clients, the way clients connect to HBase is the simplest. Below shows an example:

1:  HTable htable = ...   // instantiate HTable  
2:  Get get = new Get(Bytes.toBytes("row1"));  
3:  Result r = htable.get(get);  
4:  Put put = new Put(Bytes.toBytes(row));  
5:  put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), Bytes.toBytes( data));     
6:  htable.put(put);  
To the HBase source, first step is instantiating HTable instance. To instantiate a HTable instance, you should at least provide the "table name", and a "configuration" instance, then we will call: 
public HTable(Configuration conf, final byte [] tableName)  
One of the most important tasks in this function is to get a "HConnection" instance, and to create a ThreadPoolExecutor. All "HConnection"s are managed by HConnectionManager. There is a inner class in HConnectionManager named HConnectionKey which denotes all the parameters set by the configuration file in a HBase instance. And according to this HConnectionKey, HBase will choose different HConnectionImplementation. This strategy means HBase support reuse connections if the HTable was came from the same users and with the same configurations. So, the real connection from users clients to HBase servers is  implemented through HConnectionManager.HConnectionImplementation. In the constructor of HConnectionImplementation, we set all the private variables, like conf, managed, pause, numRetries, maxRPCAttempts, rpcTimeout, and cluster Id. Besides these statics, two class instances were created for future use: adminClass and clientClass. These two variables are created through reflection of Java like this:
1:  String adminClassName = conf.get(REGION_PROTOCOL_CLASS,  
2:      DEFAULT_ADMIN_PROTOCOL_CLASS);  
3:  this.adminClass =  
4:       (Class<? extends AdminProtocol>) Class.forName(adminClassName);  
5:  String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,  
6:      DEFAULT_CLIENT_PROTOCOL_CLASS);  
7:  this.clientClass =  
8:       (Class<? extends ClientProtocol>) Class.forName(clientClassName);  
When applications do use a HConnection, HConnectionImplementation takes charge of return a RPC Protocol instance according to the requests (for admin or clients). So there is a "getProtocol" method in HConnectionImplementation. RPC implementation is the core conception to understand how clients connect to HBase, so in next section, we describe it in detail.

1.1 RPC and Protocol in HBase

RPC in HBase works as Hadoop: they does not use the build-in java RPC mechanism, instead, they both use a similar approach: implementing their own RPC mechanism. The difference is HBase also combine Protobuf as the serialization component.

All the RPC relevant source files are located in ./org/apache/hadoop/hbase/ipc folder. And all the protocol relevant source files includes the .proto file and generated *Proto files are located in hadoop-protocol folders.

Before talking how HBase implement RPC and combine it with Protobuf, we firstly introduce the build-in dynamic proxy mechanism in Java. It works like the Proxy Design Pattern except using Proxy and Invoker class makes the proxy pattern become automatic in Java. 
Fig.1 Java Dynamic Proxy

As Fig.1 shows, the proxy object was generated by calling Proxy.newProxyInstance method instead of written by hand, and the method call will be automatically transfered to the real object using InvokerHandler's invoke method.

In HBase,  RPC series classes were mapped as Fig.2 shows:
Fig.2 HBase RPC protocol dynamic proxy mechanism

HBase uses lots of concurrent methods to accelerate the performance of client reads/writes. To connect to the a RegionServer of HBase, we need a "ClientProtocol" instance, which is generated by call HConnectionImplementation's getClient method. Start form here, there would be a serial call stack that combines all the RPC and ProtoBuf together:

HConnectionImplementation.getClient -> getProtocol -> HBaseClientRPC.waitForProxy -> HBaseClientRPC.getProxy -> RpcClientEngine.getProxy -> ProtobufRpcClientEngine.getProxy

Here, the ProtobufRpcClientEngine.getProxy() method will generate a proxy instance that is the proxy of the core 'ClientProtocol' instance came from the "HCI.getClient()" method. The "invoke" function in ProtobufRpcClientEngine's "Invoker" static class will do the RPC client stuffs:
1:  public Object invoke(Object proxy, Method method, Object[] args)  
2:      throws ServiceException {  
3:     long startTime = 0;  
4:     if (LOG.isDebugEnabled()) {  
5:      startTime = System.currentTimeMillis();  
6:     }  
7:     RpcRequestBody rpcRequest = constructRpcRequest(method, args);  
8:     Message val = null;  
9:     try {  
10:      val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);  
11:      if (LOG.isDebugEnabled()) {  
12:       long callTime = System.currentTimeMillis() - startTime;  
13:       if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);  
14:      }  
15:      return val;  
16:     } catch (Throwable e) {  
17:      if (e instanceof RemoteException) {  
18:       Throwable cause = ((RemoteException)e).unwrapRemoteException();  
19:       throw new ServiceException(cause);  
20:      }  
21:      throw new ServiceException(e);  
22:     }  
23:    }  
It constructs the RPC requestes and calls client.call() to do the real communications. Here, HBase uses Google's protobuf to construct the rpc requests: a RpcRequestBody. Then, the "HBaseClient" class takes charge of all the rpc requests and return a Message object as the results.

Till then, all the client code has finished, the method call in the ClientProtocol will be collected and reconstructed by Protobuf and sent to RegionServer. SECTION 1 finished.



SECTION 2. How Servers inside HBase Connect to Each other

There are two different types of roles in HBase: HMaster, HRegion. HMaster was elected and stared since then, HRegion runs on each server. The start point of HMaster is inside the HMaster.java file, which includes the static main method. All the main functions in HBase are delegated to an abstract ServerCommandLine class and its doMain method. Inside the doMain method, HBase call ToolRunner.run(conf, this, arg) to do the real JVM start job. The ToolRunner class was defined in Hadoop Core, so we won't describe it here. The only thing we need to know is once the ToolRunner.run began, it will call "this.run" method. To our HBase case, we will run HMasterCommandLine.run method, and finally call the "startMaster" method.

After parsing the arguments, things rollbacks to HBase again. 1) call HMaster.constructMaster() to build a HBase instance; 2) call the start method of this instance; 3) call join method of this instance; 4) Check some errors.

In constructMaster(), HBase uses reflection mechanism to build a HBase Class instance. In fact, i am still curious why construct a new instance this way? Is it faster? or more flexible?

As HBase is a subclass of abstract class: HasThread, so it automatically is Runnable. the start method means HMaster thread begin to work, and join means the JVM won't exit until HMaster exits. The way HMaster starts is interesting: all the servers would start a HMaster instance when users try to bootup a cluster, and these servers will race with others to write something on ZooKeeper. As ZooKeeper can guarantee, there would be only one success HMaster. All the others will block to wait until the success one fails and re-race again; The success one will initialization itself and begin an infinit loop to provide service.

So, since then, HMaster has started, and all other RegionServers will connect to it.

As we have described before, all servers will run HRegionServer (HRS for short) on itself too.  HRS starts using the exact same way like HMaster: In the main function, call the HRegionServerCommandLine.doMain() method. In the run method of HRS, we firstly try and register with the HMaster, tell it we are here. After registering with HMaster, HRS will go into an infinit loop. Inside the loop, HRS takes care of the region stop stuff, and report to HMaster every msgInterval seconds. There is a heartbeat report implemented by function: tryRegionServerReport(lastMsg, now). Until now, HRS started. We can communicate inside the HBase now.



2.1 RPC Server Start in HBase

We know that there are different RPC protocols in HBase, and there are only two servers: HMaster and HRegionServer running in an HBase instance. So, each server worked as a RPC server for different protocols.

HMaster implements: MasterMonitorProtocol, MasterAdminProtocol, RegionServerStatusProtocol
HRS implements: ClientProtocol, AdminProtocol

Though the protocols they implemented are quite different, the RPC server start procedures are quite similar. For example, in HRS, the RPC server was initialized in its Constructor method as fellow:
 this.rpcServer = HBaseServerRPC.getServer(AdminProtocol.class, this,  
     new Class<?>[]{ClientProtocol.class,  
       AdminProtocol.class, HBaseRPCErrorHandler.class,  
       OnlineRegions.class},  
     initialIsa.getHostName(), // BindAddress is IP we got for this server.  
     initialIsa.getPort(),  
     conf.getInt("hbase.regionserver.handler.count", 10),  
     conf.getInt("hbase.regionserver.metahandler.count", 10),  
     conf.getBoolean("hbase.rpc.verbose", false),  
     conf, HConstants.QOS_THRESHOLD);  
In HMaster, the RPC server was initialized in the exactly same place with similar code:
 this.rpcServer = HBaseServerRPC.getServer(MasterMonitorProtocol.class, this,  
     new Class<?>[]{MasterMonitorProtocol.class,  
       MasterAdminProtocol.class, RegionServerStatusProtocol.class},  
     initialIsa.getHostName(), // BindAddress is IP we got for this server.  
     initialIsa.getPort(),  
     numHandlers,  
     0, // we dont use high priority handlers in master  
     conf.getBoolean("hbase.rpc.verbose", false), conf,  
     0); // this is a DNC w/o high priority handlers  
So, all these RPC servers startup is handled by HBaseServerRPC class.

How HBaseServerRPC starts the RPC server? It is identical with HBaseClientRPC starts, as following figure shows.
Figure 3. How RPC Server starts in HBase

After these servers start, the communication inside the HBase cluster would be very clear:
Figure 4. How Servers communication inside HBase



... to be continue.



REFERENCE
----------------------
[1] Java Proxy, http://docs.oracle.com/javase/1.4.2/docs/api/java/lang/reflect/Proxy.html
[2] HBase Project, http://hbase.apache.org 


Friday, January 11, 2013

A Strange Java Multi-Thread Case

Today, we have a nice problem about Java multi-thread and other java related tips. I want to write it down to remind myself that keep curious about whatever strange thing you met because it may open a brand new window for you. Ok, first of all, here is the java code:


class Daemon extends Thread{
private static final int SIZE = 100;
private Thread[] t= new Thread[SIZE];
private static volatile int i = 0;

public Daemon(){
   start();
}
public void run(){
   for( i = 0; i < SIZE; i++){
t[i] = new DaemonSpawn(i);
//t[i].start();  [1]
   }
while(true)
yield();
}
}

class DaemonSpawn extends Thread{
private int n ;
public DaemonSpawn(int i){
this.n = i;
start();
}
public void run(){
System.out.println("DaemonSpawn" + n + "started");
while(true)
yield();
}
}

public class Daemons {
public static void main(String...args){
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Daemon());
//Thread d = new Daemon();  [2]
//System.out.println("d.isDaemon() = " + d.isDaemon());

}
}

Ok, Just as it is, we have a class named Daemons, it will new a ExecutorService and run its execute method on the Daemon class. The Daemon class constructs a serial of DaemonSpawn threads and run them. It should be an easy java code, and I guess the result should be a serial of unsorted print statements from 0 to 100?

You can not be more wrong.

The result is a serial of unsorted print statements, but there are replications.

Why?

1, The first lesson i should learn is "NEVER WRITE A START METHOD IN THE THREAD OR RUNNABLE CONSTRUCTOR". The reason is when you call start() in a thread's constructor, in fact, most of the constructor has not finished yet, and the thread starts already because of your start() method. So wired things would happen, and you never image what these things could be.

2, The second tips, The ExecutorService will make existed thread two times due to the 'exec' instance will initialize the Thread instance again by calling constructor twice. This means Daemon thread will start again, so you will see the replicated print statements.

If you use any [1] or [2] part instead of the existed codes, you would get the right results.


REFERENCE:
----------------------
[1] http://stackoverflow.com/questions/5623285/java-why-not-to-start-a-thread-in-the-constructor-how-to-terminate