Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New page]: R UDFs #146

Open
1 task done
emercado4 opened this issue Oct 24, 2024 · 0 comments
Open
1 task done

[New page]: R UDFs #146

emercado4 opened this issue Oct 24, 2024 · 0 comments
Labels
new page Suggestions for new topics and pages

Comments

@emercado4
Copy link
Contributor

emercado4 commented Oct 24, 2024

Summary of Content

After migrating to CDP we will be able to use R UDFs with the spark_apply function. We should add a page of guidance on how to do this similar to the Pandas UDF page that already exists.

This page of Mastering Spark with R is very helpful: https://therinspark.com/distributed.html

Language Version

R

Can this suggestion be used in Pyspark and / or SparklyR (Can select multiple)

SparklyR

Code snippets

# 1. Basic example

library(dplyr)
library(janitor)
library(sparklyr)

default_config <- spark_config()
default_config["spark.r.libpaths"] <- '/home/cdsw/.local/lib/R/4.3/library'

sc <- spark_connect(
  app_name = "testing-spark",
  config = default_config)

# spark_apply call to load libraries onto worker nodes
libload <- function() {
  library(dplyr)
}

# All packages will be loaded on to worker nodes during first call to spark_apply in session
# so it is more efficient to do this 'on' a minimal sdf first
sdf_len(sc, 1) |> sparklyr::spark_apply(f = libload,
                packages = FALSE)

# Simple UDF example
# Generate a sdf to run on
sdf <- sparklyr:::sdf_seq(sc, -7, 8, 2) |>
       sparklyr::mutate(half_id = id / 2) %>%
       sparklyr::select(half_id)
    
sdf |>
    sparklyr::collect() %>%
    print()

# Define UDF
round_udf <- function(df) {
    x <- df |>
        dplyr::mutate(rounded_col = round(half_id)) #need to specify dplyr package
    return(x)
}

# run UDF using spark_apply
rounded <- sparklyr::spark_apply(sdf,
   f = round_udf,
   # Specify schema - works faster if you specify a schema
   columns = c(half_id = "double",
               rounded_col = "double"),
   packages = FALSE)

# check it has worked
rounded

spark_disconnect(sc)

# 2.  Example for passing additional arguments using 'context':
default_config <- spark_config()
default_config["spark.r.libpaths"] <- .libPaths()[1]

sc <- spark_connect(
  app_name = "udf-args-test",
  config = default_config)

libload <- function() {
  library(dplyr)
  library(janitor)
  library(rlang)

}

sdf_len(sc, 1) |> sparklyr::spark_apply(f = libload,
                packages = FALSE)


sdf <- sparklyr:::sdf_seq(sc, -7, 8, 2) |>
       sparklyr::mutate(half_id = id / 2) 

sdf |>
    sparklyr::collect() %>%
    print()

round_udf <- function(df, col) {
    x <- df |>
        dplyr::mutate(rounded_col = round(!!rlang::sym(col))) #need to specify dplyr package
    return(x)
}


rounded <- sparklyr::spark_apply(sdf,round_udf, context = {col <- "half_id"},
   packages = FALSE)

spark_disconnect(sc)

Code of Conduct

  • I agree to follow this project's Code of Conduct
@emercado4 emercado4 added the new page Suggestions for new topics and pages label Oct 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
new page Suggestions for new topics and pages
Projects
None yet
Development

No branches or pull requests

1 participant