import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.Types; public class ParquetWriterExample { public static void main(String[] args) throws Exception { String schemaString = "message User { " + " required int32 id; " + " required binary name; " + "}"; MessageType schema = MessageTypeParser.parseMessageType(schemaString); Path filePath = new Path("data.parquet"); ParquetWriter writer = new ParquetWriter(filePath, new UserWriteSupport(), CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE); User user1 = new User(1, "John"); User user2 = new User(2, "Jane"); writer.write(user1); writer.write(user2); writer.close(); } } class User { public int id; public String name; public User(int id, String name) { this.id = id; this.name = name; } } class UserWriteSupport extends WriteSupport<User> { private RecordConsumer recordConsumer; @Override public WriteContext init(Configuration configuration) { return new WriteContext(schema, new java.util.HashMap<String, String>()); } @Override public void prepareForWrite(RecordConsumer recordConsumer) { this.recordConsumer = recordConsumer; } @Override public void write(User user) { recordConsumer.startMessage(); recordConsumer.startField("id", 0); recordConsumer.addInteger(user.id); recordConsumer.endField("id", 0); recordConsumer.startField("name", 1); recordConsumer.addBinary(Binary.fromCharSequence(user.name)); recordConsumer.endField("name", 1); recordConsumer.endMessage(); } } import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; public class ParquetReaderExample { public static void main(String[] args) throws Exception { Path filePath = new Path("data.parquet"); ParquetReader reader = ParquetReader.builder(new UserReadSupport(), filePath).build(); User user; while ((user = (User) reader.read()) != null) { System.out.println("ID: " + user.id + ", Name: " + user.name); } reader.close(); } } class UserReadSupport extends ReadSupport<User> { @Override public RecordMaterializer<User> prepareForRead(Configuration configuration, java.util.Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) { return new UserRecordMaterializer(); } } class UserRecordMaterializer extends RecordMaterializer<User> { private User user = new User(); @Override public User getCurrentRecord() { return user; } @Override public void startMessage() { user = new User(); } @Override public void endMessage() { // no-op } @Override public void startField(String name, int index) { // no-op } @Override public void endField(String name, int index) { // no-op } @Override public void addInteger(int value) { user.id = value; } @Override public void addBinary(Binary value) { user.name = value.toStringUsingUTF8(); } }


上一篇:
下一篇:
切换中文