public class JobDispatcher { public static void main(String[] args) { ZooKeeper zooKeeper = createZooKeeper(); String taskNode = createTaskNode(zooKeeper, "taskData"); System.out.println("Task created: " + taskNode); } private static ZooKeeper createZooKeeper() { // ... } private static String createTaskNode(ZooKeeper zooKeeper, String taskData) { try { byte[] data = taskData.getBytes(); String taskPath = zooKeeper.create("/tasks/task-", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return taskPath; } catch (Exception e) { e.printStackTrace(); } return null; } } public class JobExecutor implements Watcher { private ZooKeeper zooKeeper; public static void main(String[] args) { JobExecutor executor = new JobExecutor(); executor.initZooKeeper(); executor.listenForTask(); } private void initZooKeeper() { // ... } private void listenForTask() { try { List<String> taskNodes = zooKeeper.getChildren("/tasks", this); for (String taskNode : taskNodes) { byte[] taskData = zooKeeper.getData("/tasks/" + taskNode, this, null); System.out.println("Task received: " + new String(taskData)); // ... } } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { listenForTask(); } } } public class JobResultViewer implements Watcher { private ZooKeeper zooKeeper; public static void main(String[] args) { JobResultViewer viewer = new JobResultViewer(); viewer.initZooKeeper(); viewer.listenForResult(); } private void initZooKeeper() { // ... } private void listenForResult() { try { List<String> resultNodes = zooKeeper.getChildren("/results", this); for (String resultNode : resultNodes) { byte[] resultData = zooKeeper.getData("/results/" + resultNode, this, null); System.out.println("Result received: " + new String(resultData)); // ... } } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { listenForResult(); } } } zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 zookeeper.session.timeout=30000 [zk: localhost:2181(CONNECTED) 0] addauth digest username:password [zk: localhost:2181(CONNECTED) 1] setAcl /tasks digest:username:password:crwda [zk: localhost:2181(CONNECTED) 2] setAcl /results digest:username:password:crwda


上一篇:
下一篇:
切换中文