public class JobDispatcher {
public static void main(String[] args) {
ZooKeeper zooKeeper = createZooKeeper();
String taskNode = createTaskNode(zooKeeper, "taskData");
System.out.println("Task created: " + taskNode);
}
private static ZooKeeper createZooKeeper() {
// ...
}
private static String createTaskNode(ZooKeeper zooKeeper, String taskData) {
try {
byte[] data = taskData.getBytes();
String taskPath = zooKeeper.create("/tasks/task-", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
return taskPath;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
public class JobExecutor implements Watcher {
private ZooKeeper zooKeeper;
public static void main(String[] args) {
JobExecutor executor = new JobExecutor();
executor.initZooKeeper();
executor.listenForTask();
}
private void initZooKeeper() {
// ...
}
private void listenForTask() {
try {
List<String> taskNodes = zooKeeper.getChildren("/tasks", this);
for (String taskNode : taskNodes) {
byte[] taskData = zooKeeper.getData("/tasks/" + taskNode, this, null);
System.out.println("Task received: " + new String(taskData));
// ...
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
listenForTask();
}
}
}
public class JobResultViewer implements Watcher {
private ZooKeeper zooKeeper;
public static void main(String[] args) {
JobResultViewer viewer = new JobResultViewer();
viewer.initZooKeeper();
viewer.listenForResult();
}
private void initZooKeeper() {
// ...
}
private void listenForResult() {
try {
List<String> resultNodes = zooKeeper.getChildren("/results", this);
for (String resultNode : resultNodes) {
byte[] resultData = zooKeeper.getData("/results/" + resultNode, this, null);
System.out.println("Result received: " + new String(resultData));
// ...
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
listenForResult();
}
}
}
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.session.timeout=30000
[zk: localhost:2181(CONNECTED) 0] addauth digest username:password
[zk: localhost:2181(CONNECTED) 1] setAcl /tasks digest:username:password:crwda
[zk: localhost:2181(CONNECTED) 2] setAcl /results digest:username:password:crwda