ETCD for java_etcd-java使用

论坛 期权论坛 脚本     
已经匿名di用户   2022-5-29 19:21   2230   0

etcd-java使用

简介

项目地址

特性

Accepts/exposes protobuf-generated types directly to minimize overhead(请求头)

Simpler fluent versions of KV primitives for making async or sync requests(fluent api的方式方便制定sync还是async)

Automatic retries and (re)authentication

Resilient(可快速恢复的) and scalable Lease and Watch abstractions with semantics aligned to most typical usage

Underlying stream RPCs only kept open for as long as they are needed

Comes with some helpful utility classes with local cache, persistent key and leader election abstractions

Simple declarative client configuration (optional) - encapsulate per-cluster connection details in a

JSON document

Support for connecting to multi-endpoint IBM Compose etcd deployments over TLS

Currently doesn't cover all parts of the etcd-server API, in particular those related to cluster administration(集群管理) such as maintenance, cluster and auth management

kv操作

获取client

KvStoreClient client = EtcdClient.forEndpoint("localhost", 2379).withPlainText().build();

//KvStoreClient是一个接口,定义了如下三个client方法

KvClient kvClient = client.getKvClient();

LeaseClient leaseClient = client.getLeaseClient();

LockClient lockClient = client.getLockClient();

EtcdClient实现了Closeable接口的close方法,所以可以使用try-resource的语法,自动释放资源。示例代码见ClientBuilderTest

单个endpoint

try (KvStoreClient client = EtcdClient.forEndpoint("localhost", 2379).withPlainText().build()) {}

多个endpoint

try (KvStoreClient client = EtcdClient.forEndpoints(

"localhost:2379,http://localhost:2379,https://localhost:2379,dns:///localhost:2379")

.withPlainText().build()) {}

put操作

basic put

kvc.put(a, v2).sync();

batch put

kvc.batch().put(pr1).put(pr2).sync();

put with lease

//先创建lease

LeaseGrantResponse lgr = leaseClient.grant(2).sync();

//put的时候指定lease id

//如果lease不存在会抛出异常

kvc.put(bs("hello"), bs("world"), lgr.getID()).sync();

put后获取信息

获取上一次的值

如果key没有对应的值,返回的是KeyValue.getDefaultInstance()

如果之前key有对应的指,返回之前key对应的值

kvc.put(a, v2).prevKv().sync().getPrevKv();

获取header信息

kvc.put(a, v2).sync().getHeader();

get操作

basic get

同步get

RangeResponse rr = kvc.get(bs("a")).sync();

异步get

ListenableFuture rrFut1 = kvc.get(bs("new")).async();

rrFut1.get(1000, TimeUnit.SECONDS); //指定超时时间

batch get

TxnResponse txnResponse = kvc.batch().get(kvc.get(a).asRequest()).get(kvc.get(b).asRequest()).sync();

assertEquals(v1, txnResponse.getResponses(0).getResponseRange().getKvs(0).getValue());

assertEquals(v2, txnResponse.getResponses(1).getResponseRange().getKvs(0).getValue());

指定重试策略

ListenableFuture rrFut2 = kvc.get(bs("new"))

.deadline(Deadline.after(20, TimeUnit.SECONDS))

.backoffRetry().async(); // should work

delete操作

basic delete

获取删除的个数

kvc.delete(bs("notthere")).sync().getDeleted()

获取删除的key对应的value

kvc.delete(a).prevKv().sync().getPrevKvs(0).getValue()

batch delete

kvc.batch().delete(kvc.delete(a).asRequest())

.delete(kvc.delete(b).asRequest()).sync();

transaction

ListenableFuture tresp = kvc.txnIf().cmpEqual(a).value(v1)

.and().cmpNotEqual(b).version(10)

.then().put(kvc.put(bs("new"), bs("newval")).asRequest()).async();

lease操作

普通lease

grant

LeaseGrantResponse lgr = leaseClient.grant(5L).sync(); //不指定leaseId

lgr.getID(); //获取id

lgr.getTTL(); //存活时间

leaseClient.grant(10L).leaseId(456L).sync(); //指定leaseId

keep-alive

leaseClient.keepAliveOnce(id).get(1, SECONDS)

revoke

leaseClient.revoke(id).get(1, SECONDS);

assertEquals(-1L, leaseClient.ttl(id).get().getTTL());//撤销后ttl变成-1

persistent lease

basic

PersistentLease pl2 = lc.maintain().minTtl(minTtl).keepAliveFreq(kaFreq).start();

long newLeaseId = pl2.get(20, SECONDS);

with StreamObserver

PersistentLease pl = lc.maintain().minTtl(minTtl).keepAliveFreq(kaFreq)

.start(new StreamObserver() {

@Override

public void onNext(LeaseState value) {

System.out.println(t(start) + "PL state change: " + value);

observerEvents.add(value);

}

@Override

public void onError(Throwable t) {

System.out.println(t(start) + "PL error: " + t);

observerEvents.add(t);

}

@Override

public void onCompleted() {

System.out.println(t(start) + "PL completed");

observerEvents.add(COMPLETED);

}

});

list client lease

需要注意的是persistent lease是基于client的会话的,如果client挂了的话,lease是会自动释放的

Set lidsFound = client2.getLeaseClient().list().get()

.getLeasesList().stream().map(LeaseStatus::getID).collect(Collectors.toSet());

lock操作

同一个客户端lock不同的lease会报错,详细见LockTest.testWithContention

普通lock

LockResponse lr = lockClient.lock(KeyUtils.bs("mylock"))

.deadline(Deadline.after(500, TimeUnit.MILLISECONDS)).sync(); //获取锁

assertTrue(kvClient.txnIf().exists(lr.getKey()).sync().getSucceeded());

assertNotNull(lockClient.unlock(lockKey).sync()); //释放锁

assertFalse(kvClient.txnIf().exists(lockKey).sync().getSucceeded());

lock使用lease

LockResponse lr = lockClient.lock(KeyUtils.bs("mylock2")).withLease(lgr.getID()).sync();

lock使用persistent lease

PersistentLease pl = lec.maintain().start();

LockResponse lr = lockClient.lock(KeyUtils.bs("mylock3")).withLease(pl).sync();

//如果pl.close后会自动释放

watch操作

简单watch

WatchIterator watchIterator = kvc.watch(bs("/watchtest")).asPrefix().start();

watch指定observer

//创建一个observer

final StreamObserver observer = new StreamObserver() {

@Override

public void onNext(WatchUpdate value) {

System.out.println(t(start) + "watch event: " + value);

watchEvents.add(value);

}

@Override

public void onError(Throwable t) {

System.out.println(t(start) + "watch error: " + t);

watchEvents.add(t);

}

@Override

public void onCompleted() {

System.out.println(t(start) + "watch completed");

watchEvents.add(COMPLETED);

}

};

//启动watcher,并且绑定observer

Watch watch = kvc.watch(bs("/watchtest")).asPrefix().start(observer);

获取watch的事件

WatchUpdate wu = getNextSkipEmpty(watchEvents);

assertNotNull(wu);

assertEquals("event: " + wu, 1, wu.getEvents().size()); //事件个数

assertEquals(EventType.PUT, wu.getEvents().get(0).getType()); //事件类型

assertEquals(bs("a value"), wu.getEvents().get(0).getKv().getValue()); //值

assertEquals(bs("/watchtest/a"), wu.getEvents().get(0).getKv().getKey()); //键

election

简单的case参见示例代码LeaderElectionTest

源码浅析

EtcdClient

b5477f34eca0

KvStoreClient.png

重点方法是line397行,EtcdClient的构造方法,初始化了internalExecutor,channel,grpc,kvClient等

EtcdKvClient

etcd kv操作和watch操作的核心类,实现了KvClient接口,其他的leaseClient和lockClient类似

b5477f34eca0

EtcdKvClient.png

重点关注AbstractFluentRequest,底层的client通过grpc进行远程rpc调用。

所有的操作在底层抽象为sync和async的操作,sync操作其实也是async操作,只是在client层做了get操作。

@Override

public final ListenableFuture async() {

return async(null);

}

@Override

public final RespT sync() {

return client.waitForCall(this::async);

}

GrpcClient

grpc调用的封装类, 底层调用方法

ClientCalls.futureUnaryCall(channel.newCall(method, callOptions), request)

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:81
帖子:4969
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP