import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.Types;
public class ParquetWriterExample {
public static void main(String[] args) throws Exception {
String schemaString = "message User {
" +
" required int32 id;
" +
" required binary name;
" +
"}";
MessageType schema = MessageTypeParser.parseMessageType(schemaString);
Path filePath = new Path("data.parquet");
ParquetWriter writer = new ParquetWriter(filePath, new UserWriteSupport(), CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE);
User user1 = new User(1, "John");
User user2 = new User(2, "Jane");
writer.write(user1);
writer.write(user2);
writer.close();
}
}
class User {
public int id;
public String name;
public User(int id, String name) {
this.id = id;
this.name = name;
}
}
class UserWriteSupport extends WriteSupport<User> {
private RecordConsumer recordConsumer;
@Override
public WriteContext init(Configuration configuration) {
return new WriteContext(schema, new java.util.HashMap<String, String>());
}
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}
@Override
public void write(User user) {
recordConsumer.startMessage();
recordConsumer.startField("id", 0);
recordConsumer.addInteger(user.id);
recordConsumer.endField("id", 0);
recordConsumer.startField("name", 1);
recordConsumer.addBinary(Binary.fromCharSequence(user.name));
recordConsumer.endField("name", 1);
recordConsumer.endMessage();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
public class ParquetReaderExample {
public static void main(String[] args) throws Exception {
Path filePath = new Path("data.parquet");
ParquetReader reader = ParquetReader.builder(new UserReadSupport(), filePath).build();
User user;
while ((user = (User) reader.read()) != null) {
System.out.println("ID: " + user.id + ", Name: " + user.name);
}
reader.close();
}
}
class UserReadSupport extends ReadSupport<User> {
@Override
public RecordMaterializer<User> prepareForRead(Configuration configuration, java.util.Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
return new UserRecordMaterializer();
}
}
class UserRecordMaterializer extends RecordMaterializer<User> {
private User user = new User();
@Override
public User getCurrentRecord() {
return user;
}
@Override
public void startMessage() {
user = new User();
}
@Override
public void endMessage() {
// no-op
}
@Override
public void startField(String name, int index) {
// no-op
}
@Override
public void endField(String name, int index) {
// no-op
}
@Override
public void addInteger(int value) {
user.id = value;
}
@Override
public void addBinary(Binary value) {
user.name = value.toStringUsingUTF8();
}
}