Skip to content

Instantly share code, notes, and snippets.

@davisp
Created August 28, 2025 17:48
Show Gist options
  • Save davisp/348b69c36200cff9e298111b7ac9834f to your computer and use it in GitHub Desktop.
Save davisp/348b69c36200cff9e298111b7ac9834f to your computer and use it in GitHub Desktop.
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::Session;
use datafusion::catalog::TableFunctionImpl;
use datafusion::common::arrow::datatypes::Schema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::{Expr, TableType};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
// To define your own table function, you only need to do the following 3 things:
// 1. Implement your own [`TableProvider`]
// 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`]
// 3. Register the function using [`SessionContext::register_udtf`]
/// This example demonstrates how to register a TableFunction
#[tokio::main]
async fn main() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();
// register the table function that will be called in SQL statements by `read_csv`
ctx.register_udtf("gap_fill", Arc::new(GapFillTableFunc {}));
let df = ctx
.sql("CREATE TABLE test(ts TIMESTAMP, value BIGINT);")
.await?;
df.show().await?;
let df = ctx
.sql("SELECT * FROM gap_fill(test.ts, INTERVAL 1 seconds);")
.await?;
df.show().await?;
let df = ctx
.sql("SELECT * FROM gap_fill((SELECT * FROM test), ts, INTERVAL 1 seconds);")
.await?;
df.show().await?;
Ok(())
}
#[derive(Debug)]
struct GapFillTableFunc {}
impl TableFunctionImpl for GapFillTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
eprintln!("exprs: {:#?}", exprs);
let schema = Arc::new(Schema::empty());
let table = MyTable {
schema,
batches: Default::default(),
};
Ok(Arc::new(table))
}
}
#[derive(Debug)]
struct MyTable {
schema: SchemaRef,
batches: Vec<RecordBatch>,
}
#[async_trait]
impl TableProvider for MyTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[self.batches.clone()],
TableProvider::schema(self),
projection.cloned(),
)?)
}
}
++
++
exprs: [
Column(
Column {
relation: Some(
Bare {
table: "test",
},
),
name: "ts",
},
),
Literal(
IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 1000000000 }"),
None,
),
]
++
++
exprs: [
Column(
Column {
relation: None,
name: "ts",
},
),
Literal(
IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 1000000000 }"),
None,
),
]
++
++
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment