etcd-java使用
简介
项目地址
特性
Accepts/exposes protobuf-generated types directly to minimize overhead(请求头)
Simpler fluent versions of KV primitives for making async or sync requests(fluent api的方式方便制定sync还是async)
Automatic retries and (re)authentication
Resilient(可快速恢复的) and scalable Lease and Watch abstractions with semantics aligned to most typical usage
Underlying stream RPCs only kept open for as long as they are needed
Comes with some helpful utility classes with local cache, persistent key and leader election abstractions
Simple declarative client configuration (optional) - encapsulate per-cluster connection details in a
JSON document
Support for connecting to multi-endpoint IBM Compose etcd deployments over TLS
Currently doesn't cover all parts of the etcd-server API, in particular those related to cluster administration(集群管理) such as maintenance, cluster and auth management
kv操作
获取client
KvStoreClient client = EtcdClient.forEndpoint("localhost", 2379).withPlainText().build();
//KvStoreClient是一个接口,定义了如下三个client方法
KvClient kvClient = client.getKvClient();
LeaseClient leaseClient = client.getLeaseClient();
LockClient lockClient = client.getLockClient();
EtcdClient实现了Closeable接口的close方法,所以可以使用try-resource的语法,自动释放资源。示例代码见ClientBuilderTest
单个endpoint
try (KvStoreClient client = EtcdClient.forEndpoint("localhost", 2379).withPlainText().build()) {}
多个endpoint
try (KvStoreClient client = EtcdClient.forEndpoints(
"localhost:2379,http://localhost:2379,https://localhost:2379,dns:///localhost:2379")
.withPlainText().build()) {}
put操作
basic put
kvc.put(a, v2).sync();
batch put
kvc.batch().put(pr1).put(pr2).sync();
put with lease
//先创建lease
LeaseGrantResponse lgr = leaseClient.grant(2).sync();
//put的时候指定lease id
//如果lease不存在会抛出异常
kvc.put(bs("hello"), bs("world"), lgr.getID()).sync();
put后获取信息
获取上一次的值
如果key没有对应的值,返回的是KeyValue.getDefaultInstance()
如果之前key有对应的指,返回之前key对应的值
kvc.put(a, v2).prevKv().sync().getPrevKv();
获取header信息
kvc.put(a, v2).sync().getHeader();
get操作
basic get
同步get
RangeResponse rr = kvc.get(bs("a")).sync();
异步get
ListenableFuture rrFut1 = kvc.get(bs("new")).async();
rrFut1.get(1000, TimeUnit.SECONDS); //指定超时时间
batch get
TxnResponse txnResponse = kvc.batch().get(kvc.get(a).asRequest()).get(kvc.get(b).asRequest()).sync();
assertEquals(v1, txnResponse.getResponses(0).getResponseRange().getKvs(0).getValue());
assertEquals(v2, txnResponse.getResponses(1).getResponseRange().getKvs(0).getValue());
指定重试策略
ListenableFuture rrFut2 = kvc.get(bs("new"))
.deadline(Deadline.after(20, TimeUnit.SECONDS))
.backoffRetry().async(); // should work
delete操作
basic delete
获取删除的个数
kvc.delete(bs("notthere")).sync().getDeleted()
获取删除的key对应的value
kvc.delete(a).prevKv().sync().getPrevKvs(0).getValue()
batch delete
kvc.batch().delete(kvc.delete(a).asRequest())
.delete(kvc.delete(b).asRequest()).sync();
transaction
ListenableFuture tresp = kvc.txnIf().cmpEqual(a).value(v1)
.and().cmpNotEqual(b).version(10)
.then().put(kvc.put(bs("new"), bs("newval")).asRequest()).async();
lease操作
普通lease
grant
LeaseGrantResponse lgr = leaseClient.grant(5L).sync(); //不指定leaseId
lgr.getID(); //获取id
lgr.getTTL(); //存活时间
leaseClient.grant(10L).leaseId(456L).sync(); //指定leaseId
keep-alive
leaseClient.keepAliveOnce(id).get(1, SECONDS)
revoke
leaseClient.revoke(id).get(1, SECONDS);
assertEquals(-1L, leaseClient.ttl(id).get().getTTL());//撤销后ttl变成-1
persistent lease
basic
PersistentLease pl2 = lc.maintain().minTtl(minTtl).keepAliveFreq(kaFreq).start();
long newLeaseId = pl2.get(20, SECONDS);
with StreamObserver
PersistentLease pl = lc.maintain().minTtl(minTtl).keepAliveFreq(kaFreq)
.start(new StreamObserver() {
@Override
public void onNext(LeaseState value) {
System.out.println(t(start) + "PL state change: " + value);
observerEvents.add(value);
}
@Override
public void onError(Throwable t) {
System.out.println(t(start) + "PL error: " + t);
observerEvents.add(t);
}
@Override
public void onCompleted() {
System.out.println(t(start) + "PL completed");
observerEvents.add(COMPLETED);
}
});
list client lease
需要注意的是persistent lease是基于client的会话的,如果client挂了的话,lease是会自动释放的
Set lidsFound = client2.getLeaseClient().list().get()
.getLeasesList().stream().map(LeaseStatus::getID).collect(Collectors.toSet());
lock操作
同一个客户端lock不同的lease会报错,详细见LockTest.testWithContention
普通lock
LockResponse lr = lockClient.lock(KeyUtils.bs("mylock"))
.deadline(Deadline.after(500, TimeUnit.MILLISECONDS)).sync(); //获取锁
assertTrue(kvClient.txnIf().exists(lr.getKey()).sync().getSucceeded());
assertNotNull(lockClient.unlock(lockKey).sync()); //释放锁
assertFalse(kvClient.txnIf().exists(lockKey).sync().getSucceeded());
lock使用lease
LockResponse lr = lockClient.lock(KeyUtils.bs("mylock2")).withLease(lgr.getID()).sync();
lock使用persistent lease
PersistentLease pl = lec.maintain().start();
LockResponse lr = lockClient.lock(KeyUtils.bs("mylock3")).withLease(pl).sync();
//如果pl.close后会自动释放
watch操作
简单watch
WatchIterator watchIterator = kvc.watch(bs("/watchtest")).asPrefix().start();
watch指定observer
//创建一个observer
final StreamObserver observer = new StreamObserver() {
@Override
public void onNext(WatchUpdate value) {
System.out.println(t(start) + "watch event: " + value);
watchEvents.add(value);
}
@Override
public void onError(Throwable t) {
System.out.println(t(start) + "watch error: " + t);
watchEvents.add(t);
}
@Override
public void onCompleted() {
System.out.println(t(start) + "watch completed");
watchEvents.add(COMPLETED);
}
};
//启动watcher,并且绑定observer
Watch watch = kvc.watch(bs("/watchtest")).asPrefix().start(observer);
获取watch的事件
WatchUpdate wu = getNextSkipEmpty(watchEvents);
assertNotNull(wu);
assertEquals("event: " + wu, 1, wu.getEvents().size()); //事件个数
assertEquals(EventType.PUT, wu.getEvents().get(0).getType()); //事件类型
assertEquals(bs("a value"), wu.getEvents().get(0).getKv().getValue()); //值
assertEquals(bs("/watchtest/a"), wu.getEvents().get(0).getKv().getKey()); //键
election
简单的case参见示例代码LeaderElectionTest
源码浅析
EtcdClient

KvStoreClient.png
重点方法是line397行,EtcdClient的构造方法,初始化了internalExecutor,channel,grpc,kvClient等
EtcdKvClient
etcd kv操作和watch操作的核心类,实现了KvClient接口,其他的leaseClient和lockClient类似

EtcdKvClient.png
重点关注AbstractFluentRequest,底层的client通过grpc进行远程rpc调用。
所有的操作在底层抽象为sync和async的操作,sync操作其实也是async操作,只是在client层做了get操作。
@Override
public final ListenableFuture async() {
return async(null);
}
@Override
public final RespT sync() {
return client.waitForCall(this::async);
}
GrpcClient
grpc调用的封装类, 底层调用方法
ClientCalls.futureUnaryCall(channel.newCall(method, callOptions), request)