I am running a local Yarn Cluster with 8 vCores and 8Gb total memory.
The workflow is as such:
YarnClient submits an app request that starts the AppMaster in a container.
AppMaster start, creates amRMClient and nmClient, register itself to the RM and next it creates 4 container requests for worker threads via amRMClient.addContainerRequest
Even though there are enough resources available containers are not allocated (The callback's function onContainersAllocated is never called). I tried inspecting nodemanager's and resourcemanager's logs and I don't see any line related to the container requests. I followed closely apache docs and can't understand what I`m doing wrong.
For reference here is the AppMaster code:
@Override
public void run() {
Map<String, String> envs = System.getenv();
String containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.toString());
if (containerIdString == null) {
// container id should always be set in the env by the framework
throw new IllegalArgumentException("ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
LOG.info("Starting AppMaster Client...");
YarnAMRMCallbackHandler amHandler = new YarnAMRMCallbackHandler(allocatedYarnContainers);
// TODO: get heart-beet interval from config instead of 100 default value
amClient = AMRMClientAsync.createAMRMClientAsync(1000, this);
amClient.init(config);
amClient.start();
LOG.info("Starting AppMaster Client OK");
//YarnNMCallbackHandler nmHandler = new YarnNMCallbackHandler();
containerManager = NMClient.createNMClient();
containerManager.init(config);
containerManager.start();
// Get port, ulr information. TODO: get tracking url
String appMasterHostname = NetUtils.getHostname();
String appMasterTrackingUrl = "/progress";
// Register self with ResourceManager. This will start heart-beating to the RM
RegisterApplicationMasterResponse response = null;
LOG.info("Register AppMaster on: " + appMasterHostname + "...");
try {
response = amClient.registerApplicationMaster(appMasterHostname, 0, appMasterTrackingUrl);
} catch (YarnException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
LOG.info("Register AppMaster OK");
// Dump out information about cluster capability as seen by the resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
containerMemory = Integer.parseInt(config.get(YarnConfig.YARN_CONTAINER_MEMORY_MB));
containerCores = Integer.parseInt(config.get(YarnConfig.YARN_CONTAINER_CPU_CORES));
// A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerCores + ", max=" + maxVCores);
containerCores = maxVCores;
}
List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration.");
for (int i = 0; i < 4; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amClient.addContainerRequest(containerAsk); // NOTHING HAPPENS HERE...
LOG.info("Available resources: " + amClient.getAvailableResources().toString());
}
while(completedYarnContainers != 4) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOG.info("Done with allocation!");
}
@Override
public void onContainersAllocated(List<Container> containers) {
LOG.info("Got response from RM for container ask, allocatedCnt=" + containers.size());
for (Container container : containers) {
LOG.info("Allocated yarn container with id: {}" + container.getId());
allocatedYarnContainers.push(container);
// TODO: Launch the container in a thread
}
}
@Override
public void onError(Throwable error) {
LOG.error(error.getMessage());
}
@Override
public float getProgress() {
return (float) completedYarnContainers / allocatedYarnContainers.size();
}
Here is output from jps:
14594 NameNode
15269 DataNode
17975 Jps
14666 ResourceManager
14702 NodeManager
And here is AppMaster log for initialization and 4 container requests:
23:47:09 YarnAppMaster - Starting AppMaster Client OK
23:47:09 YarnAppMaster - Register AppMaster on: andrei-mbp.local/192.168.1.4...
23:47:09 YarnAppMaster - Register AppMaster OK
23:47:09 YarnAppMaster - Max mem capabililty of resources in this cluster 2048
23:47:09 YarnAppMaster - Max vcores capabililty of resources in this cluster 2
23:47:09 YarnAppMaster - Received 0 previous AM's running containers on AM registration.
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Progress indicator should not be negative
Thanks in advance.
I suspect the problem comes exactly from the negative progress:
23:47:11 YarnAppMaster - Progress indicator should not be negative
Note that, since you are using the AMRMAsyncClient, requests are not made immediately when you call addContainerRequest. There is actually an heartbeat function which is run periodically and it is in this function that allocate is called and the pending requests will be made. The progress value used by this function initially starts at 0 but is updated with the value returned by your handler once a response from the acquire is obtained.
The first acquire is supposedly done right after the register so the getProgress function should be called then and update the existing progress. As it is, your progress will be updated to NaN because, at this time, allocatedYarnContainers will be empty and completedYarnContainers will also be 0 and so your returned progress will be the result of 0/0 which is not defined. It just so happens that when the next allocate checks your progress value, it will fail because NaNs return false in all comparisons and so no other allocate function will actually communicate with the ResourceManager because it quits right at that first step with an exception.
Try changing your progress function to the following:
@Override
public float getProgress() {
return (float) allocatedYarnContainers.size() / 4.0f;
}
(note: copied to StackOverflow for posteriority from here)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With