Spark Azure DataBricks Read Avro file with Date Range
Last couple of days I was working on analyze the spark stream in azure databricks. We have get stream of files which are saved as file storage. Our Requirement was read file for specific date Range. Or files will be saved as This format.
Our Requirement is to read files withing date range .
Ex: 2020/03/26 to 2020/04/10
Then We thought to use regex pattern to read the files.
Solution 1
val path1 = "/mnt/twiterData/data/stream/*/2020/{03}/{28,29,30}/*/*/*.avro"
var avroDf ="com.databricks.spark.avro").load(path1, path2)
But the problem is if there are any empty folders or empty files it will throw error . Then Next solution is to read the file path using SparkHadoopUtil . What is does is when we parse the regex path it will iterate the base path & returns the list of available file paths. In this scenario it only returns valid paths.
Solution 2
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
def listFiles(basep: String, globp: String): Seq[String] = {
val conf = new Configuration(sc.hadoopConfiguration)
val fs = FileSystem.get(new URI(basep), conf)
def validated(path: String): Path = {
if(path startsWith "/") new Path(path)
else new Path("/" + path)
val fileCatalog = InMemoryFileIndex.bulkListLeafFiles(
paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
hadoopConf = conf,
filter = null,
sparkSession = spark)
val rootPath = "/mnt/twiterData/data/stream/"
var filePathFormat ="/*/2020/04/{01,02,03}/*/*/*.avro";
val paths = listFiles(rootPath, filePathFormat)
val allFilesPath = paths.toSeq
var avroDf ="com.databricks.spark.avro").load(allFilesPath: _*)
If you have range of regex formats iterate one by one & collect file path as list then can parse to spark context.