Faster munging for the Wikidata Query Service using Hadoop

October 14, 2020 3 By addshore

The Wikidata query service is a public SPARQL endpoint for querying all of the data contained within Wikidata. In a previous blog post I walked through how to set up a complete copy of this query service. One of the steps in this process is the munge step. This performs some pre-processing on the RDF dump that comes directly from Wikidata.

Back in 2019 this step took 20 hours and now takes somewhere between 1-2 days as Wikidata has continued to grow. The original munge step (munge.sh) makes use of only a single CPU. The WMF has been experimenting for some time with performing this step in their Hadoop cluster as part of their modern update mechanism (streaming updater). An additional patch has now also made this useful for the current default load process (using loadData.sh).

This post walks through using the new Hadoop based munge step with the latest Wikidata TTL dump on Google clouds Dataproc service. This cuts the munge time down from 1-2 days to just 2 hours using an 8 worker cluster. Even faster times can be expected with more workers, all the way down to ~20 minutes.

Prerequisites

As this post uses the Google cloud Dataproc service you’ll need a Google cloud account with the Dataproc service enabled. You should be able to follow the steps using any Hadoop cluster but you might need to alter most of the commands.

You also need to have the latest latest-all.ttl.bz2 file downloaded from the Wikimedia dumps site or a mirror. After some testing I found that the dumps.wikimedia.your.org mirror appears to have the fastest download speed. The dump needs to be accessible from your Hadoop cluster.

For our project, that means downloading the dump to a bucket on a small instance.

curl https://dumps.wikimedia.your.org/wikidatawiki/entities/latest-all.ttl.bz2 | gsutil cp - gs://myBucketName/wikidata-all.ttl.bz2Code language: JavaScript (javascript)

If you want to automate this step you could automatically spin up a small VM that runs a small startup script to download the dump to a bucket and then delete itself (or delete it like this). This VM needs the scope “compute-rw” so that it can write to a bucket.

Munging

First we need to create a new cluster that we can perform the munging on.

The below command will create a cluster with 8 workers which takes a couple of hours to munge the dump. For the WMF production WDQS munge step this is performed on a cluster of 64 machines and takes ~20 minutes.

gcloud dataproc clusters create query-munge-1 \
--subnet default --enable-component-gateway --image-version 1.3-debian10 \
--master-machine-type n1-standard-4 --master-boot-disk-type pd-ssd --master-boot-disk-size 60 \
--worker-machine-type n1-standard-8 --worker-boot-disk-type pd-ssd --worker-boot-disk-size 60 --num-worker-local-ssds 1 --num-workers 8Code language: JavaScript (javascript)

Then we need to fire up a job, referring to the dump that we have, that will perform the munge and write the data back to a bucket.

wget https://archiva.wikimedia.org/repository/releases/org/wikidata/query/rdf/rdf-spark-tools/0.3.49/rdf-spark-tools-0.3.49-jar-with-dependencies.jar
gcloud dataproc jobs submit spark --cluster=query-munge-1 \
--class=org.wikidata.query.rdf.spark.WikidataTurtleDumpConverter \
--jars=rdf-spark-tools-0.3.49-jar-with-dependencies.jar
 \
--properties=spark.master=yarn \
--properties=spark.executor.memory=25G \
--properties=spark.executor.cores=8 \
--properties=spark.driver.memory=2G \
--properties=spark.dynamicAllocation.enabled=true \
--properties=spark.dynamicAllocation.maxExecutors=64  \
--properties=fs.gs.block.size=536870912  \
-- \
--input-path gs://myBucketName/wikidata-all.ttl.bz2 \
--num-partitions 512 \
--output-path gs://myBucketName/munged/nt \
--output-format nt \
--skolemizeCode language: JavaScript (javascript)

You’ll be able to monitor the job in the Google Cloud UI, and should slowly see the gzip files appearing in the bucket.

Once the job is complete you can delete the cluster.

gcloud dataproc clusters delete query-munge-1
Code language: JavaScript (javascript)

Preparing for load

I won’t talk about loading the data into blazegraph here, if you need a guide for that see this post.

One thing that you will need to do in order to make the wdqs load scripts read the files is to rename them slightly. When you come to grab the data from the bucket for loading on a host you can perform the download and rename as below.

gsutil -m cp -r gs://myBucketName/munged/nt /mnt/ssdRead

apt-get update
apt-get install -y rename

rename -v 's/(part-\d+)(\.gz)/$1.ttl$2/' /mnt/ssdRead/nt/*Code language: JavaScript (javascript)

Concluding

This is part 2 in a series of blog posts about running your own copy of the Wikidata Query Service. If you missed the first part, check it out here.

Many thanks to DCausse and the rest of the WMF search platform team for helping prepare and release the final patch needed for this post. Most of the code for the Hadoop munge step was written by the WMF team with only the final touch allowing export of gziped nt files being worked on by me.

You must be using the 0.3.49 version of WDQS jars or greater for this to work. There is not currently a wmde released docker image that lines up with this release.

The spark job should work on any Hadoop cluster. Google cloud is simply used as a convenient easily reproducible example.

I have not performed a full data load with this “flavour” of munged files yet, but it should be fine.