Spark Azure DataBricks Read Avro file with Date Range

Sajith vijesekara
2 min readApr 10, 2020

Last couple of days I was working on analyze the spark stream in azure databricks. We have get stream of files which are saved as file storage. Our Requirement was read file for specific date Range. Or files will be saved as This format.

/mnt/twiterData/data/stream/0/2020/04/10/02/10/54.avro

Our Requirement is to read files withing date range .

Ex: 2020/03/26 to 2020/04/10

Then We thought to use regex pattern to read the files.

Solution 1

val path1 = "/mnt/twiterData/data/stream/*/2020/{03}/{28,29,30}/*/*/*.avro"
val path2 = "/mnt/twiterData/data/stream/*/2020/{03}/{28,29,30}/*/*/*.avro"
var avroDf = spark.read.format("com.databricks.spark.avro").load(path1, path2)

But the problem is if there are any empty folders or empty files it will throw error . Then Next solution is to read the file path using SparkHadoopUtil . What is does is when we parse the regex path it will iterate the base path & returns the list of available file paths. In this scenario it only returns valid paths.

Solution 2

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import java.net.URI

def listFiles(basep: String, globp: String): Seq[String] = {
val conf = new Configuration(sc.hadoopConfiguration)
val fs = FileSystem.get(new URI(basep), conf)

def validated(path: String): Path = {
if(path startsWith "/") new Path(path)
else new Path("/" + path)
}

val fileCatalog = InMemoryFileIndex.bulkListLeafFiles(
paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
hadoopConf = conf,
filter = null,
sparkSession = spark)

fileCatalog.flatMap(_._2.map(_.path))
}

val rootPath = "/mnt/twiterData/data/stream/"
var filePathFormat ="/*/2020/04/{01,02,03}/*/*/*.avro";

val paths = listFiles(rootPath, filePathFormat)
val allFilesPath = paths.toSeq
var avroDf = spark.read.format("com.databricks.spark.avro").load(allFilesPath: _*)

If you have range of regex formats iterate one by one & collect file path as list then can parse to spark context.

--

--

Sajith vijesekara

Technical Lead. Passionate about cloud computing & web security | Freelance Mobile Developer| CKAD | AWS Community Builder 🇱🇰