Reading and Writing Parquet Files in Different Languages
In python, the easiest option is to use fastparquet package. If you're using conda simply type:
conda install fastparquet
Fastparquet is an amazing python implementation and is my personal favorite. It's ease of use and stability makes it stand out against other implementations. Internally it's using some native code to speed up data processing and is even faster than native Java implementation.
To read a parquet file write the following code:
from fastparquet import ParquetFile
from fastparquet import write
pf = ParquetFile(test_file)
df = pf.to_pandas()
which gives you a Pandas DataFrame. Writing is also trivial. Having the dataframe use this code to write it:
write(file_path, df, compression="UNCOMPRESSED")
compression
has several options and defaults to SNAPPY
. Unfortunately, by default SNAPPY
requires another package to be installed and is not pulled automatically with fastparquet, therefore I like to specify the compression option explicitly. And they are:
UNCOMPRESSED
SNAPPY
PARQUET
With Java things are a bit more complicated. If you're using Maven you can pull this set of minimal dependencies:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aloneguid.parquet</groupId>
<artifactId>perf</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.1</version>
</dependency>
</dependencies>
</project>
To read files, you would use AvroParquetReader
class, and AvroParquetWrite
to write it, however it's complicated to find any documenation on how to use it, therefore I'll provide full source code here (as short as I can) for your convenience:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Main {
private static final Configuration conf = new Configuration();
public static void main(String[] args) throws IOException {
Path file = new Path("C:\\dev\\parquet-dotnet\\src\\Parquet.Test\\data\\customer.impala.parquet");
Path outUncompressed = new Path("c:\\tmp\\java.uncompressed.parquet");
Path outGzipped = new Path("c:\\tmp\\java.gzip.parquet");
List<Long> readTimes = new ArrayList<Long>();
List<Long> writeUTimes = new ArrayList<Long>();
List<Long> writeGTimes = new ArrayList<Long>();
List<GenericRecord> allRecords = new ArrayList<GenericRecord>();
Schema schema = null;
for(int i = 0; i < 11; i++) {
//read
TimeWatch readTime = TimeWatch.start();
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
GenericRecord record;
while((record = reader.read()) != null) {
if(i == 0) {
//add once
allRecords.add(record);
if(schema == null) {
schema = record.getSchema();
}
}
}
reader.close();
long readMs = readTime.time();
if(i != 0) {
readTimes.add(readMs);
}
//write (uncompressed)
File t = new File(outUncompressed.toString());
t.delete();
TimeWatch writeUnc = TimeWatch.start();
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(outUncompressed)
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
.withSchema(schema)
.build();
for(GenericRecord wr: allRecords) {
writer.write(wr);
}
writer.close();
if(i != 0) {
writeUTimes.add((writeUnc.time()));
}
writeTest(i, CompressionCodecName.UNCOMPRESSED, writeUTimes, outUncompressed,
schema, allRecords);
writeTest(i, CompressionCodecName.GZIP, writeGTimes, outGzipped,
schema, allRecords);
}
System.out.println("mean (read): " + avg(readTimes));
System.out.println("mean (write uncompressed): " + avg(writeUTimes));
System.out.println("mean (write gzip): " + avg(writeGTimes));
}
private static void writeTest(int iteration, CompressionCodecName codec, List<Long> times,
Path destPath, Schema schema, List<GenericRecord> records) throws IOException {
File t = new File(destPath.toString());
t.delete();
TimeWatch timer = TimeWatch.start();
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(destPath)
.withCompressionCodec(codec)
.withSchema(schema)
.build();
for(GenericRecord wr: records) {
writer.write(wr);
}
writer.close();
if(iteration != 0) {
times.add(timer.time());
}
}
private static Long avg(List<Long> list) {
long sum = 0;
for(Long time : list) {
sum += time;
}
return sum / list.size();
}
}
You've got a good library here as well (and I'm an author of it) called parquet-dotnet. The library provides first-class support for all major OS'es and is a pure .NET citizen. It's available on NuGet and has zero setup. It's self explanatory and has plenty of sample on the front page.
Thanks for reading. If you would like to follow up with future posts please subscribe to my rss feed and/or follow me on twitter.