167、HBase观察者协处理器示例

示例

HBase提供了Observer Coprocessor(观察者协处理器)的示例。

下面给出更详细的例子。

这些示例假设一个名为users的表,其中有两个列族personalDet和salaryDet,包含个人和工资详细信息。下面是users表格:

`` ``
personalDet salaryDet
jverne

Jules

Verne

02/08/1828

12000

9000

3000

rowkey

name

lastname

dob

gross

net

allowances

admin

Admin

Admin

cdickens

Charles

Dickens

02/07/1812

10000

8000

2000

观察者示例

以下Observer协处理器可防止用户admin的详细信息在users表Get或者Scan中返回。

1、 编写一个实现RegionObserver类的类;
2、 重写preGetOp()方法(不推荐使用该preGet()方法)以检查客户端是否已使用admin值查询rowkey如果是,则返回空结果否则,正常处理请求;
3、 将您的代码和依赖项放在JAR文件中;
4、 将JAR放在HDFS中,HBase可以在其中找到它;
5、 加载协处理器;
6、 写一个简单的程序来测试它;

以下是上述步骤的实现:

public class RegionObserverExample implements RegionObserver {

    private static final byte[] ADMIN = Bytes.toBytes("admin");
    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
    private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
    private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details");

    @Override
    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<Cell> results)
    throws IOException {

        if (Bytes.equals(get.getRow(),ADMIN)) {
            Cell c = CellUtil.createCell(get.getRow(),COLUMN_FAMILY, COLUMN,
            System.currentTimeMillis(), (byte)4, VALUE);
            results.add(c);
            e.bypass();
        }
    }
}

重写preGetOp()仅适用于Get操作。您还需要重写该preScannerOpen()方法以从扫描结果中过滤admin行。

@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,
final RegionScanner s) throws IOException {

    Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN));
    scan.setFilter(filter);
    return s;
}

这种方法有效,但有副作用。

如果客户端在其扫描中使用了过滤器,则该过滤器将替换该过滤器。相反,您可以显式删除扫描中的任何admin结果:

@Override
public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,
final List<Result> results, final int limit, final boolean hasMore) throws IOException {
        Result result = null;
    Iterator<Result> iterator = results.iterator();
    while (iterator.hasNext()) {
    result = iterator.next();
        if (Bytes.equals(result.getRow(), ROWKEY)) {
            iterator.remove();
            break;
        }
    }
    return hasMore;
}

端点示例

仍然使用该users表,该示例使用端点协处理器实现协处理器以计算所有员工工资的总和。

1、 创建一个定义服务的’.proto’文件;

    option java_package = "org.myname.hbase.coprocessor.autogenerated";
    option java_outer_classname = "Sum";
    option java_generic_services = true;
    option java_generate_equals_and_hash = true;
    option optimize_for = SPEED;
    message SumRequest {
        required string family = 1;
        required string column = 2;
    }

    message SumResponse {
      required int64 sum = 1 [default = 0];
    }

    service SumService {
      rpc getSum(SumRequest)
        returns (SumResponse); 
    }

2、 执行protoc命令以从上面的.proto文件生成Java代码;

    $ mkdir src
    $ protoc --java_out=src ./sum.proto

这将生成一个类调用Sum.java。

3、 编写一个扩展生成的服务类的类,实现Coprocessor和CoprocessorService类,并重写服务方法;
**注意:**如果您从hbase-site.xml加载协处理器然后使用HBase Shell 再次加载同一个协处理器,它将再次加载。同一个类将存在两次,第二个实例将具有更高的ID(因此具有更低的优先级)。结果是有效地忽略了重复的协处理器。

    public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService {

        private RegionCoprocessorEnvironment env;

        @Override
        public Service getService() {
            return this;
        }

        @Override
        public void start(CoprocessorEnvironment env) throws IOException {
            if (env instanceof RegionCoprocessorEnvironment) {
                this.env = (RegionCoprocessorEnvironment)env;
            } else {
                throw new CoprocessorException("Must be loaded on a table region!");
            }
        }

        @Override
        public void stop(CoprocessorEnvironment env) throws IOException {
            // do nothing
        }

        @Override
        public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback<Sum.SumResponse> done) {
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes(request.getFamily()));
            scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));

            Sum.SumResponse response = null;
            InternalScanner scanner = null;

            try {
                scanner = env.getRegion().getScanner(scan);
                List<Cell> results = new ArrayList<>();
                boolean hasMore = false;
                long sum = 0L;

                do {
                    hasMore = scanner.next(results);
                    for (Cell cell : results) {
                        sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
                    }
                    results.clear();
                } while (hasMore);

                response = Sum.SumResponse.newBuilder().setSum(sum).build();
            } catch (IOException ioe) {
                ResponseConverter.setControllerException(controller, ioe);
            } finally {
                if (scanner != null) {
                    try {
                        scanner.close();
                    } catch (IOException ignored) {}
                }
            }

            done.run(response);
        }
    }
    Configuration conf = HBaseConfiguration.create();
    Connection connection = ConnectionFactory.createConnection(conf);
    TableName tableName = TableName.valueOf("users");
    Table table = connection.getTable(tableName);

    final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();
    try {
        Map<byte[], Long> results = table.coprocessorService(
            Sum.SumService.class,
            null,  /* start key */
            null,  /* end   key */
            new Batch.Call<Sum.SumService, Long>() {
                @Override
                public Long call(Sum.SumService aggregate) throws IOException {
                    BlockingRpcCallback<Sum.SumResponse> rpcCallback = new BlockingRpcCallback<>();
                    aggregate.getSum(null, request, rpcCallback);
                    Sum.SumResponse response = rpcCallback.get();

                    return response.hasSum() ? response.getSum() : 0L;
                }
            }
        );

        for (Long sum : results.values()) {
            System.out.println("Sum = " + sum);
        }
    } catch (ServiceException e) {
        e.printStackTrace();
    } catch (Throwable e) {
        e.printStackTrace();
    }

4、 加载协处理器;
5、 编写客户端代码以调用协处理器;