Open jonas opened 8 years ago
Thanks for bringing this up. We thought about this but it would require a lot more work to implement TableSchema/TableRow <-> DataSource schema and row converters. Don't think we have enough bandwidth to tackle this but will surely appreciate it if you can look into this.
@nevillelyh Thanks for the quick reply. Yes, this was definitely something I was planning to contribute.
Hey @jonas @nevillelyh
I am facing a similar problem and would love to contribute
So for us to do this we need to 1) implement converters which take in a dataframe and extract schema 2) Given the dataframe schema we produce a TableSchema 3) Use the TableSchema to create a table definition and save directly to BQ
Am I correct?
Yes. It'd be great if you guys can collaborate on that. And we can also remove spark-avro
dependency and using Hadoop Avro{Input,Output}Format
directly.
Ok no worries. @jonas give me a shout on here or twitter so we can work something out :)
Yes, that would be great. We are currently using JSON instead of avro to load data into BQ to workaround an issue where the fields would have root
in their name. Here's the simplistic converter for the schema that doesn't support structural types:
def saveAsBigQueryTable(tableRef: TableReference,
writeDisposition: WriteDisposition.Value,
createDisposition: CreateDisposition.Value): Unit = {
val bucket = conf.get(BigQueryConfiguration.GCS_BUCKET_KEY)
val temp = s"spark-bigquery-${System.currentTimeMillis()}=${Random.nextInt(Int.MaxValue)}"
val gcsPath = s"gs://$bucket/hadoop/tmp/spark-bigquery/$temp"
self.write.json(gcsPath)
val schemaFields = self.schema.fields.map { field =>
val fieldType = field.dataType match {
case BooleanType => "BOOLEAN"
case LongType => "INTEGER"
case IntegerType => "INTEGER"
case StringType => "STRING"
case DoubleType => "FLOAT"
case TimestampType => "TIMESTAMP"
case _: DecimalType => "INTEGER"
}
new TableFieldSchema().setName(field.name).setType(fieldType)
}.toList
val tableSchema = new TableSchema().setFields(schemaFields)
val df = bq.load(gcsPath, tableRef, tableSchema, writeDisposition, createDisposition)
delete(new Path(gcsPath))
df
}
This ticket is to discuss whether there is interest in migrating this project to providing a more "native" integration via Spark's DataSource API. From looking at spark-avro this looks doable and I think it would make it easier deal with ticket #2 and #3.
This would also make it possible to provide different ways of loading data, for example via JSON or Avro, using BQ partitions etc based on user preference. We are currently using JSON internally to work around what seems to be an BigQuery Avro bug: https://code.google.com/p/google-bigquery/issues/detail?id=549.