HBase 协处理器

Submitted by Lizhe on Thu, 05/04/2017 - 23:46

协处理器可以将一部分运算工作移动到数据的存放端

协处理器允许用户在region服务器上运行自己的代码, 更准确地说是让用户可以在region上进行具体操作并且可以使用类似RDBMS数据库中trigger的功能

1 配置文件式 hbase-site.xml

    <property>
        <name>hbase.coprocessor.region.classes</name>
        <value>hbase.observer.RegionObserverExample</value>
    </property>

2 命令行式

删除

alter 'test', METHOD => 'table_att_unset',NAME => 'coprocessor$1'

添加

alter 'test','coprocessor'=>'file:/download/hbasejars/hadoop.sort.jar|hbase.observer.RegionObserverExample|210002|'

3 java 代码式

Path path = new Path("/share/hbaseone/lzlib/lzhbase-0.0.1-SNAPSHOT.jar");
tableDesc.setValue("HelloworldRegionObserver", "file:"+path.toString()+"|"+hbase.observer.RegionObserverExample.class.getCanonicalName()+"|"+Coprocessor.PRIORITY_USER);

 

package hbase.helloworld;

/**
 * 
 */

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseHelloworld {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        String tableName = "test";
        String columnFamily = "cf";
        try {

            if (true == HBaseHelloworld.delete(tableName)) {
                System.out.println("Delete Table " + tableName + " success!");

            }

            HBaseHelloworld.create(tableName, columnFamily);
            HBaseHelloworld.put(tableName, "row1", columnFamily, "column1",
                    "data1");
            HBaseHelloworld.put(tableName, "row2", columnFamily, "column2",
                    "data2");
            HBaseHelloworld.put(tableName, "row3", columnFamily, "column3",
                    "data3");
            HBaseHelloworld.put(tableName, "row4", columnFamily, "column4",
                    "data4");
            HBaseHelloworld.put(tableName, "row5", columnFamily, "column5",
                    "data5");

            HBaseHelloworld.get(tableName, "row1");

            HBaseHelloworld.scan(tableName);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static Configuration conf = HBaseConfiguration.create();
    static {
        conf.set("hbase.zookeeper.quorum", "vagrant");
    }

    public static void create(String tableName, String columnFamily)
            throws Exception {
        HBaseAdmin admin = new HBaseAdmin(conf);
        if (admin.tableExists(tableName)) {
            System.out.println(tableName + " exists!");
        } else {
            HTableDescriptor tableDesc = new HTableDescriptor(tableName);
            tableDesc.addFamily(new HColumnDescriptor(columnFamily));
            

//            FileSystem fs = FileSystem.get(conf);
//            Path path = new Path(fs.getUri()+Path.SEPARATOR+"filename.jar");
            Path path = new Path("/share/hbaseone/lzlib/lzhbase-0.0.1-SNAPSHOT.jar");
            tableDesc.setValue("HelloworldRegionObserver", "file:"+path.toString()+"|"+hbase.observer.RegionObserverExample.class.getCanonicalName()+"|"+Coprocessor.PRIORITY_USER);
            
            admin.createTable(tableDesc);
            System.out.println(tableName + " create successfully!");
        }
    }

    public static void put(String tablename, String row, String columnFamily,
            String column, String data) throws Exception {

        HTable table = new HTable(conf, tablename);
        Put put = new Put(Bytes.toBytes(row));

        put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column),
                Bytes.toBytes(data));

        table.put(put);

        System.out.println("put '" + row + "', '" + columnFamily + ":" + column
                + "', '" + data + "'");

    }

    public static void get(String tablename, String row) throws Exception {
        HTable table = new HTable(conf, tablename);
        Get get = new Get(Bytes.toBytes(row));
        Result result = table.get(get);
        System.out.println("Get: " + result);
    }

    public static void scan(String tableName) throws Exception {

        HTable table = new HTable(conf, tableName);
        Scan s = new Scan();
        ResultScanner rs = table.getScanner(s);

        for (Result r : rs) {
            System.out.println("Scan: " + r);

        }
    }

    public static boolean delete(String tableName) throws IOException {

        HBaseAdmin admin = new HBaseAdmin(conf);
        if (admin.tableExists(tableName)) {
            try {
                admin.disableTable(tableName);
                admin.deleteTable(tableName);
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
        return true;
    }
}

package hbase.observer;

import java.io.IOException;
import java.util.List;
 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
 
public class RegionObserverExample extends BaseRegionObserver {
    public static final Log LOG = LogFactory.getLog(HRegion.class);
    public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@");
 
    public void preGet(ObserverContext<RegionCoprocessorEnvironment> c,
            Get get, List<KeyValue> result) throws IOException {
        LOG.debug("Got preGet for row: " + Bytes.toStringBinary(get.getRow()));
         
        if (Bytes.equals(get.getRow(), FIXED_ROW)) {
            KeyValue kv = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW,
                    Bytes.toBytes(System.currentTimeMillis()));
            LOG.debug("Had a match, adding fake kv: " + kv);
            result.add(kv);
        }
    }
}

 

package hbase.observer;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;

public class LoadWithTableDescriptorExample {  
    
    static Configuration conf = HBaseConfiguration.create();
    static {
        conf.set("hbase.rootdir","hdfs://lzvm:9000/jars" );
        conf.set("hbase.zookeeper.quorum", "lzvm");
    }
          
    public static void main(String[] args) throws IOException  
    {  
 
        HTableDescriptor htd = new HTableDescriptor("testtable");  
        htd.addFamily(new HColumnDescriptor("colfam1"));  
        //  
        htd.setValue("COPROCESSOR$1", "hdfs://lzvm:9000/jars/hadoop.sort-0.0.1-SNAPSHOT.jar" +  
                "|" + RegionObserverExample.class.getCanonicalName() +  
                "|" + Coprocessor.PRIORITY_USER);  
        //  
        HBaseAdmin admin = new HBaseAdmin(conf);  
          
        admin.createTable(htd);  
          
        System.out.println("end");  
    }  
}