Skip to content

Commit

Permalink
update dev scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
stemangiola committed Nov 15, 2024
1 parent 2f66076 commit 09ad7b3
Show file tree
Hide file tree
Showing 5 changed files with 882 additions and 368 deletions.
19 changes: 12 additions & 7 deletions dev/cellxgene_to_metadata.R
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,26 @@ tar_script({
slurm_cpus_per_task = 1,
workers = 200,
tasks_max = 5,
verbose = T
verbose = T,
seconds_idle = 30
),
crew_controller_slurm(
name = "slurm_1_10",
slurm_memory_gigabytes_per_cpu = 10,
slurm_cpus_per_task = 1,
workers = 100,
tasks_max = 5,
verbose = T
verbose = T,
seconds_idle = 30
),
crew_controller_slurm(
name = "slurm_1_20",
slurm_memory_gigabytes_per_cpu = 20,
slurm_cpus_per_task = 1,
workers = 100,
tasks_max = 5,
verbose = T,
verbose = T, ,
seconds_idle = 30
script_directory = paste0("/vast/scratch/users/mangiola.s/cellxgenedp/crew_cluster/", basename(tempdir()))
),
crew_controller_slurm(
Expand All @@ -76,23 +79,26 @@ tar_script({
slurm_cpus_per_task = 1,
workers = 50,
tasks_max = 5,
verbose = T
verbose = T,
seconds_idle = 30
),
crew_controller_slurm(
name = "slurm_1_80",
slurm_memory_gigabytes_per_cpu = 80,
slurm_cpus_per_task = 1,
workers = 30,
tasks_max = 5,
verbose = T
verbose = T,
seconds_idle = 30
),
crew_controller_slurm(
name = "slurm_1_200",
slurm_memory_gigabytes_per_cpu = 200,
slurm_cpus_per_task = 1,
workers = 5,
tasks_max = 5,
verbose = T
verbose = T,
seconds_idle = 30
)
),
resources = tar_resources(crew = tar_resources_crew("slurm_1_10"))
Expand Down Expand Up @@ -901,7 +907,6 @@ age_days_tbl |>




# #
# cell_ids_for_metadata <- tbl(
# dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
Expand Down
129 changes: 87 additions & 42 deletions dev/execute_hpcell_on_census_and_defining_data_tranformation.R
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
library(glue)
library(dplyr)
library(tibble)
library(glue)
library(purrr)
library(targets)
library(stringr)
library(HPCell)
library(crew.cluster)
library(arrow)
library(CuratedAtlasQueryR)
directory = "/vast/scratch/users/shen.m/Census_rerun/split_h5ad_based_on_sample_id/"
sample_anndata <- dir(glue("{directory}"), full.names = T)
downloaded_samples_tbl <- read_parquet("/vast/scratch/users/shen.m/Census_rerun/census_samples_to_download_groups.parquet")
downloaded_samples_tbl <- downloaded_samples_tbl |>
rename(cell_number = list_length) |>
dplyr::rename(cell_number = list_length) |>
mutate(cell_number = cell_number |> as.integer(),
file_name = glue("{directory}{sample_2}.h5ad") |> as.character(),
tier = case_when(
Expand Down Expand Up @@ -104,10 +101,9 @@ job::job({

sample_names |>
initialise_hpc(
store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store/",
gene_nomenclature = "ensembl",
data_container_type = "anndata",
store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store",
# store = "/vast/projects/cellxgene_curated/census_hpcell_oct_2024/target_store_1_20",
tier = tiers,
computing_resources = list(
crew_controller_slurm(
Expand All @@ -117,7 +113,8 @@ job::job({
workers = 300,
tasks_max = 10,
verbose = T,
launch_max = 5
launch_max = 10,
seconds_idle = 30
),

crew_controller_slurm(
Expand All @@ -127,7 +124,8 @@ job::job({
workers = 200,
tasks_max = 10,
verbose = T,
launch_max = 5
launch_max = 10,
seconds_idle = 30
),
crew_controller_slurm(
name = "tier_3",
Expand All @@ -136,7 +134,8 @@ job::job({
workers = 100,
tasks_max = 10,
verbose = T,
launch_max = 5
launch_max = 5,
seconds_idle = 30
),
crew_controller_slurm(
name = "tier_4",
Expand All @@ -145,18 +144,19 @@ job::job({
workers = 100,
tasks_max = 10,
verbose = T,
launch_max = 5
launch_max = 5,
seconds_idle = 30
)
),
verbosity = "verbose_positives",
verbosity = "summary",
# debug_step = "annotation_tbl_tier_4",
update = "never",
error = "continue",
garbage_collection = 100,
workspace_on_error = TRUE

) |>
tranform_assay(fx = functions, target_output = "sce_transformed") |>
transform_assay(fx = functions, target_output = "sce_transformed") |>

# # Remove empty outliers based on RNA count threshold per cell
remove_empty_threshold(target_input = "sce_transformed", RNA_feature_threshold = 200) |>
Expand All @@ -171,7 +171,7 @@ job::job({



tar_meta(starts_with("annotation_tbl_tier_4"), store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store") |>
tar_meta(starts_with("annotation_tbl_"), store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store") |>
filter(!data |> is.na()) |> arrange(desc(time)) |> select(error, name)

# I have to check the input of this NULL target
Expand Down Expand Up @@ -207,7 +207,6 @@ annotation_label_transfer(sce_transformed_tier_4, empty_droplets_tbl = empty_tbl
#'
#' @example Usage:
#' The pipeline script is saved as `/vast/scratch/users/mangiola.s/lighten_annotation_tbl_target.R` and can be run using `tar_make()`.

tar_script({
library(dplyr)
library(magrittr)
Expand All @@ -218,41 +217,44 @@ tar_script({
library(crew.cluster)
tar_option_set(
memory = "transient",
garbage_collection = 1000,
garbage_collection = 100,
storage = "worker",
retrieval = "worker",
error = "continue",
debug = "annotation_tbl_light",
#debug = "annotation_tbl_light",
cue = tar_cue(mode = "never"),
controller = crew_controller_group(
list(
crew_controller_slurm(
name = "tier_1",
script_lines = "#SBATCH --mem 8G",
slurm_cpus_per_task = 1,
workers = 500,
workers = 200,
tasks_max = 10,
verbose = T,
launch_max = 5
launch_max = 5,
seconds_idle = 30
),

crew_controller_slurm(
name = "tier_2",
script_lines = "#SBATCH --mem 10G",
slurm_cpus_per_task = 1,
workers = 300,
workers = 200,
tasks_max = 10,
verbose = T,
launch_max = 5
launch_max = 5,
seconds_idle = 30
),
crew_controller_slurm(
name = "tier_3",
script_lines = "#SBATCH --mem 15G",
slurm_cpus_per_task = 1,
workers = 300,
workers = 200,
tasks_max = 10,
verbose = T,
launch_max = 5
launch_max = 5,
seconds_idle = 30
),
crew_controller_slurm(
name = "tier_4",
Expand All @@ -261,16 +263,21 @@ tar_script({
workers = 30,
tasks_max = 10,
verbose = T,
launch_max = 5
launch_max = 5,
seconds_idle = 30
)
)
),
trust_object_timestamps = TRUE
trust_object_timestamps = TRUE,
workspaces = "annotation_tbl_light_ffcd3d5a64bedf1f"
)

lighten_annotation = function(target_name, my_store ){
annotation_tbl = tar_read_raw( target_name, store = my_store )
if(annotation_tbl |> is.null()) return(NULL)
if(annotation_tbl |> is.null()) {
warning("this annotation is null -> ", target_name)
return(NULL)
}

annotation_tbl |>
unnest(blueprint_scores_fine) |>
Expand Down Expand Up @@ -314,7 +321,7 @@ job::job({
tar_make(
script = "/vast/scratch/users/mangiola.s/lighten_annotation_tbl_target.R",
store = "/vast/scratch/users/mangiola.s/lighten_annotation_tbl_target",
reporter = "summary", callr_function = NULL
reporter = "summary"
)

})
Expand All @@ -324,31 +331,69 @@ library(arrow)
library(dplyr)
library(duckdb)

# Write annotation light
tar_read(annotation_tbl_light, store = "/vast/scratch/users/mangiola.s/lighten_annotation_tbl_target") |>
rename(
blueprint_first_labels_fine = blueprint_first.labels.fine,
monaco_first_labels_fine = monaco_first.labels.fine,
azimuth_predicted_celltype_l2 = azimuth_predicted.celltype.l2
) |>
write_parquet("/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/annotation_tbl_light.parquet")
write_parquet_to_parquet = function(data_tbl, output_parquet, compression = "gzip") {

# Establish connection to DuckDB in-memory database
con_write <- dbConnect(duckdb::duckdb(), dbdir = ":memory:")

# Register `data_tbl` within the DuckDB connection (this doesn't load it into memory)
duckdb::duckdb_register(con_write, "data_tbl_view", data_tbl)

# Use DuckDB's COPY command to write `data_tbl` directly to Parquet with compression
copy_query <- paste0("
COPY data_tbl_view TO '", output_parquet, "' (FORMAT PARQUET, COMPRESSION '", compression, "');
")

# Execute the COPY command
dbExecute(con_write, copy_query)

# Unregister the temporary view
duckdb::duckdb_unregister(con_write, "data_tbl_view")

# Disconnect from the database
dbDisconnect(con_write, shutdown = TRUE)
}

cell_metadata <- tbl(
dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata.parquet')")
) |>

# Write annotation light
cell_metadata <-
tbl(
dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata.parquet')")
) |>
mutate(cell_ = paste0(cell_, "___", dataset_id)) |>
select(cell_, observation_joinid, contains("cell_type"), dataset_id, self_reported_ethnicity, tissue, donor_id, sample_id, is_primary_data, assay)


cell_annotation =
tar_read(annotation_tbl_light, store = "/vast/scratch/users/mangiola.s/lighten_annotation_tbl_target") |>
rename(
blueprint_first_labels_fine = blueprint_first.labels.fine,
monaco_first_labels_fine = monaco_first.labels.fine,
azimuth_predicted_celltype_l2 = azimuth_predicted.celltype.l2
)

empty_droplet =
tar_read(empty_tbl_tier_1, store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store") |>
c(tar_read(empty_tbl_tier_2, store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store")) |>
c(tar_read(empty_tbl_tier_3, store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store")) |>
c(tar_read(empty_tbl_tier_4, store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store")) |>
bind_rows() |>
rename(cell_ = .cell)


tar_read(annotation_tbl_light, store = "/vast/scratch/users/mangiola.s/lighten_annotation_tbl_target") |>
left_join(cell_metadata, copy = TRUE) |>
write_parquet("/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation.parquet")
cell_metadata |>
left_join(empty_droplet, copy=TRUE) |>
left_join(cell_annotation, copy=TRUE) |>
write_parquet_to_parquet("/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation.parquet")

system("~/bin/rclone copy /vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation.parquet box_adelaide:/Mangiola_ImmuneAtlas/reannotation_consensus/")


tar_workspace(
"annotation_tbl_light_ffcd3d5a64bedf1f",
script = "/vast/scratch/users/mangiola.s/lighten_annotation_tbl_target.R",
store = "/vast/scratch/users/mangiola.s/lighten_annotation_tbl_target",
)

# tar_workspaces(annotation_tbl_light_c8078b8175604dd3, store = "/vast/scratch/users/mangiola.s/lighten_annotation_tbl_target")

Expand Down
Loading

0 comments on commit 09ad7b3

Please sign in to comment.