zkWatcher

zkWatcher

三月 20, 2020

直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package aamen.test.zookeeperTest;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;


/**
* @author aamen
* @Title:
* @Description:
* @Version:
* @date 2020/3/19 6:32 下午
*/
public class DemoWatcher implements Watcher {
private ZooKeeper zookeeper;
private static final String CONNECT_PATH = "118.190.40.252:2181,118.190.40.252:2182,118.190.40.252:2183";

// 超时时间
private static final int SESSION_TIME_OUT = 20000;

private static final CountDownLatch countDownLatch = new CountDownLatch(1);

private ZooKeeper zk = null;

//用来计数,判断调用了几次watcher方法
private static final AtomicInteger cnt = new AtomicInteger(0);

//设定编码
private static final Charset CHARSET = Charset.forName("UTF-8");

public static void main(String[] args) throws Exception {
DemoWatcher demo = new DemoWatcher();

//获取连接
demo.createConnection(CONNECT_PATH, SESSION_TIME_OUT);

//睡眠
Thread.sleep(1000);

//使用默认的watcher, watcher只执行一次,不是一直执行 ,exists 没有触发事件
Stat stat = demo.zk.exists("/watcher", true);
if (stat == null) {
String path = demo.zk.create("/watcher", //节点
"watcher".getBytes(CHARSET), //节点内容
Ids.OPEN_ACL_UNSAFE, //所有热都可以访问
CreateMode.EPHEMERAL); //临时节点
System.out.println(path);
}

//更新了数据,而且设定了监听watch
demo.zk.exists("/watcher", true);
// demo.zk.setData("/watcher", "update".getBytes(), -1);
Thread.sleep(Long.MAX_VALUE);

}

/**
* 设定watcher处理的方法
*/
public void process(WatchedEvent event) {
if (event == null) {
return;
}

KeeperState state = event.getState();
EventType type = event.getType();

//受影响的节点
String path = event.getPath();

System.out.println("---------------------Watcher调用次数\t" + cnt.incrementAndGet() + "---------------------");
System.out.println("path\t" + path);

System.out.println("状态\t" + state);
System.out.println("事件类型\t" + type);
//判断是否连接了
if (state == KeeperState.SyncConnected) {


switch (event.getType()) {
case None:
countDownLatch.countDown();
System.out.println("事件\t连接成功");
break;
case NodeCreated:
System.out.println("事件\t节点创建");
break;
case NodeChildrenChanged:
System.out.println("事件\t节点创建");
break;
case NodeDeleted:
System.out.println("事件\t节点删除");
break;
case NodeDataChanged:
System.out.println("事件\t节点变更");
break;
default:
break;
}
}
// 节点发生变化之后重新监听
try {
this.zk.exists("/watcher", true);
} catch (Exception e) {
e.printStackTrace();
}


System.out.println("--------------------------------------------------");

}

public void createConnection(String path, int timeOut) {

try {
//关闭连接
this.closeConnection();

//获取连接,设定监听的Watch
zk = new ZooKeeper(path, timeOut, this);

//等待
countDownLatch.await();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

/**
* 关闭连接
* 创建日期:2017年10月14日<br/>
* 创建用户:yellowcong<br/>
* 功能描述:
*/
public void closeConnection() {
try {
if (zk != null) {
zk.close();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}