Skip to content

Commit

Permalink
logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Daan de Boer committed Mar 22, 2024
1 parent a1117b4 commit 25068d8
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 34 deletions.
60 changes: 59 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"


[dependencies]
anyhow = "1.0.81"
log = "0.4.16"
simple_logger = "4.3.3"
chrono = "0.4.19"
Expand All @@ -23,3 +24,4 @@ base64 = "0.22.0"
hmac = "0.12.1"
sha2 = "0.10.8"
async-trait = "0.1.77"
simple-logging = "2.0.2"
Binary file modified Release/Linux/OfficeAuditLogCollector
100755 → 100644
Binary file not shown.
Binary file modified Release/Windows/OfficeAuditLogCollector.exe
Binary file not shown.
105 changes: 89 additions & 16 deletions src/api_connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use reqwest;
use log::{debug, warn, error};
use log::{debug, warn, error, info};
use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap};
use tokio;
use serde_json;
Expand All @@ -9,17 +9,19 @@ use futures::channel::mpsc::{Receiver, Sender};
use crate::config::Config;
use crate::data_structures::{JsonList, StatusMessage, GetBlobConfig, GetContentConfig, AuthResult,
ContentToRetrieve, CliArgs};
use anyhow::Result;
use serde_json::Value;


/// Return a logged in API connection object. Use the Headers value to make API requests.
pub fn get_api_connection(args: CliArgs, config: Config) -> ApiConnection {
pub async fn get_api_connection(args: CliArgs, config: Config) -> ApiConnection {

let mut api = ApiConnection {
args,
config,
headers: HeaderMap::new(),
};
api.login();
api.login().await;
api
}

Expand All @@ -34,7 +36,8 @@ pub struct ApiConnection {
impl ApiConnection {
/// Use tenant_id, client_id and secret_key to request a bearer token and store it in
/// our headers. Must be called once before requesting any content.
fn login(&mut self) {
async fn login(&mut self) {
info!("Logging in to Office Management API.");
let auth_url = format!("https://login.microsoftonline.com/{}/oauth2/token",
self.args.tenant_id.to_string());

Expand All @@ -48,43 +51,113 @@ impl ApiConnection {

self.headers.insert(CONTENT_TYPE, "application/x-www-form-urlencoded".parse().unwrap());

let login_client = reqwest::blocking::Client::new();
let json: AuthResult = login_client
let login_client = reqwest::Client::new();
let result = login_client
.post(auth_url)
.headers(self.headers.clone())
.form(&params)
.send()
.unwrap_or_else(|e| panic!("Could not send API login request: {}", e))
.json()
.unwrap_or_else(|e| panic!("Could not parse API login reply: {}", e));
.await;
let response = match result {
Ok(response) => response,
Err(e) => {
let msg = format!("Could not send API login request: {}", e);
error!("{}", msg);
panic!("{}", msg);
}
};
if !response.status().is_success() {
let text = match response.text().await {
Ok(text) => text,
Err(e) => {
let msg = format!("Received error response to API login, but could not parse response: {}", e);
error!("{}", msg);
panic!("{}", msg);
}
};
let msg = format!("Received error response to API login: {}", text);
error!("{}", msg);
panic!("{}", msg);
}
let json = match response.json::<AuthResult>().await {
Ok(json) => json,
Err(e) => {
let msg = format!("Could not parse API login reply: {}", e);
error!("{}", msg);
panic!("{}", msg);
}
};

let token = format!("bearer {}", json.access_token);
self.headers.insert(AUTHORIZATION, token.parse().unwrap());
info!("Successfully logged in to Office Management API.")
}

fn get_base_url(&self) -> String {
format!("https://manage.office.com/api/v1.0/{}/activity/feed", self.args.tenant_id)
}

pub fn subscribe_to_feeds(&self) {
pub async fn subscribe_to_feeds(&self) -> Result<()> {

let content_types = self.config.collect.content_types.get_content_type_strings();
info!("Subscribing to audit feeds.");
let mut content_types = self.config.collect.content_types.get_content_type_strings();

let client = reqwest::blocking::Client::new();
let client = reqwest::Client::new();
info!("Getting current audit feed subscriptions.");
let url = format!("{}/subscriptions/list", self.get_base_url());
let result: Vec<HashMap<String, Value>> = client
.get(url)
.headers(self.headers.clone())
.header("content-length", 0)
.send()
.await?
.json()
.await?;
for subscription in result {
let status = subscription
.get("status")
.expect("No status in JSON")
.as_str()
.unwrap()
.to_string()
.to_lowercase();
if status == "enabled" {
let content_type = subscription
.get("contentType")
.expect("No contentType in JSON")
.as_str()
.unwrap()
.to_string()
.to_lowercase();
if let Some(i) = content_types
.iter()
.position(|x| x.to_lowercase() == content_type) {
info!("Already subscribed to feed {}", content_type);
content_types.remove(i);
}
}
}
for content_type in content_types {
let url = format!("{}/subscriptions/start?contentType={}",
self.get_base_url(),
content_type
);
client
debug!("Subscribing to {} feed.", content_type);
let response = client
.post(url)
.headers(self.headers.clone())
.header("content-length", 0)
.send()
.unwrap_or_else(
|e| panic!("Error setting feed subscription status {}", e)
);
.await?;
if !response.status().is_success() {
let text = response.text().await?;
let msg = format!("Received error response subscribing to audit feed {}: {}", content_type, text);
error!("{}", msg);
panic!("{}", msg);
}
}
info!("All audit feeds subscriptions exist.");
Ok(())
}


Expand Down
10 changes: 7 additions & 3 deletions src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct Collector {

impl Collector {

pub fn new(args: CliArgs, config: Config, runs: HashMap<String, Vec<(String, String)>>) -> Collector {
pub async fn new(args: CliArgs, config: Config, runs: HashMap<String, Vec<(String, String)>>) -> Collector {

// Initialize interfaces
let mut interfaces: Vec<Box<dyn Interface>> = Vec::new();
Expand All @@ -63,8 +63,12 @@ impl Collector {
// Initialize collector threads
let api = api_connection::get_api_connection(
args.clone(), config.clone()
);
api.subscribe_to_feeds();
).await;
if let Err(e) = api.subscribe_to_feeds().await {
let msg = format!("Error subscribing to audit feeds: {}", e);
error!("{}", msg);
panic!("{}", msg);
}

let known_blobs = config.load_known_blobs();
let (result_rx, stats_rx, kill_tx) =
Expand Down
40 changes: 27 additions & 13 deletions src/interfaces/azure_oms_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use base64::prelude::BASE64_STANDARD;
use chrono::Utc;
use futures::{stream, StreamExt};
use hmac::{Hmac, Mac};
use log::warn;
use log::{error, info, warn};
use sha2::Sha256;
use crate::config::Config;
use crate::data_structures::Caches;
Expand Down Expand Up @@ -56,7 +56,7 @@ impl Interface for OmsInterface {
async fn send_logs(&mut self, logs: Caches) {
let client = reqwest::Client::new();

println!("SEND");
info!("Sending logs to OMS interface.");
let mut requests = Vec::new();
for (content_type, content_logs) in logs.get_all_types() {
for log in content_logs.iter() {
Expand All @@ -75,33 +75,47 @@ impl Interface for OmsInterface {
}
}

let resource = "/api/logs";
let uri = format!("https://{}.ods.opinsights.azure.com{}?api-version=2016-04-01",
self.config.output.oms.as_ref().unwrap().workspace_id, resource);

info!("URL for OMS calls will be: {}", uri);
let calls = stream::iter(requests)
.map(|(body, table_name, time_value, content_length)| {
let client = client.clone();
let rfc1123date = Utc::now().format("%a, %d %b %Y %H:%M:%S GMT");
let uri = uri.clone();
let method = "POST".to_string();
let content_type = "application/json".to_string();
let resource = "/api/logs".to_string();
let signature = self.build_signature(rfc1123date.to_string(), content_length,
method.clone(), content_type.to_string(),
let rfc1123date = Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string();
let signature = self.build_signature(rfc1123date.clone(), content_length,
method.clone(), content_type.clone(),
resource.to_string());


let uri = format!("https://{}.ods.opinsights.azure.com{}?api-version=2016-04-01",
self.config.output.oms.as_ref().unwrap().workspace_id, resource);
tokio::spawn(async move {
let resp = client
let result = client
.post(uri)
.header("content-type", "application/json")
.header("content-length", content_length)
.header("Authorization", signature)
.header("Log-Type", table_name)
.header("x-ms-date", rfc1123date.to_string())
.header("x-ms-date", rfc1123date.clone())
.header("time-generated-field", time_value)
.body(body)
.send()
.await.unwrap();
resp.bytes().await
.await;
match result {
Ok(response) => {
if !response.status().is_success() {
match response.text().await {
Ok(text) => error!("Error response after sending log to OMS: {}", text),
Err(e) => error!("Error response after sending log to OMS, but could not parse response: {}", e),
}
}
},
Err(e) => {
error!("Error send log to OMS: {}", e);
}
}
})
})
.buffer_unordered(10);
Expand Down
Loading

0 comments on commit 25068d8

Please sign in to comment.