I have meet a problem about hadoop2.2.0 append operation. I append some bytes to a hdfs file by HDFS java API.First I will create the target file if the file doesn't exist before appending operation, the codes like:
String fileUri = "hdfs://hadoopmaster:9000/in/append_test.txt";
// create the hdfs file, if not exists
HdfsClient.createPathIfNotExist(fileUri);
// do 5 times append operation
for (int i=0; i<5; i++){
HdfsClient.appendTo(fileUri, ("append content"+i).getBytes("UTF-8"));
}
The createPathIfNotExist
function:
Path p = null;
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(uri), conf);
p = new Path(uri);
if (!fs.exists(p)) {
if (uri.charAt(uri.length() - 1) == '/'){ //create a directory
if(fs.mkdirs(p)){
// create successfully
}
}else{ //create a file
FSDataOutputStream fos = fs.create(p);
fos.close();
}
} else{
System.out.println(uri + "existing");
}
} catch (IOException e) {
e.printStackTrace();
} finally{
if (fs != null)
try {
fs.close();
fs = null;
} catch (IOException e) {
e.printStackTrace();
}
}
The appendTo
function:
ByteArrayInputStream in = null;
OutputStream out = null;
FileSystem fs = null;
try {
in = new ByteArrayInputStream(bytes);
fs = FileSystem.get(URI.create(uri), conf);
out = fs.append(new Path(uri)); //get append outputstream
IOUtils.copyBytes(in, out, bufferSize, false);
} catch(Exception e){
e.printStackTrace();
} finally{
if (in != null) IOUtils.closeStream(in);
if (out != null) IOUtils.closeStream(out);
if (fs != null){
try {
fs.close();
fs = null;
} catch (IOException e) {
e.printStackTrace();
}
}
}
The result is the append_test.txt is created, but the content only has:
append content0
And occur exception:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to create file [/in/append_test.txt] for [DFSClient_NONMAPREDUCE_-1148656837_1] on client [192.168.141.1], because this file is already being created by [DFSClient_NONMAPREDUCE_2099912242_1] on [192.168.141.1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2320)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2153)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2386)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2347)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:508)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:320)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59572)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)
at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy10.append(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.append(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:245)
at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1480)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1520)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1508)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:310)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:306)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:306)
at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1160)
at org.lh.blog.message.hadoop.HdfsClient$2.run(HdfsClient.java:130)
at org.lh.blog.message.hadoop.HdfsClient$2.run(HdfsClient.java:1)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:356)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1471)
at org.lh.blog.message.hadoop.HdfsClient.appendTo(HdfsClient.java:121)
at org.lh.blog.message.hadoop.HdfsClient.appendTo(HdfsClient.java:110)
at org.lh.blog.message.test.HdfsClientTests.testCreateFileBeforeAppend(HdfsClientTests.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
That says, it only did one time append operation after creating the nonexist file, other 4 append operations failed, occur above errors.
I have created the file before append, but it says AlreadyBeingCreatedException
, I am some confused.
I also have some tries. I found the hdfs files created by java API, all can't do append operation. But the hdfs files created by hdfs command(etc, "hdfs dfs -put"), can do append operation.
Can you help me, give me some suggestions?
Thanks & Regards.
To solve the issue,
This process worked fine for me and solved the issue.
APPEND operation is expensive and if you're trying to attempt it in parallel, then this issue my arise. Hence re-create file and re-write the contents to it rather than appending.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With