Skip to content

Commit

Permalink
Merge pull request #4349 from guardian/an/scanamo-2
Browse files Browse the repository at this point in the history
upgrade to scanamo v2.0.0 latest
  • Loading branch information
andrew-nowak authored Oct 24, 2024
2 parents f0408df + f5d75c6 commit 427322f
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 100 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ lazy val commonLib = project("common-lib").settings(
// see: https://logback.qos.ch/setup.html#janino
"org.codehaus.janino" % "janino" % "3.0.6",
"com.typesafe.play" %% "play-json-joda" % "2.9.2",
"com.gu" %% "scanamo" % "1.0.0-M8",
"org.scanamo" %% "scanamo" % "2.0.0",
// Necessary to have a mix of play library versions due to scala-java8-compat incompatibility
"com.typesafe.play" %% "play-ahc-ws" % "2.8.9",
"org.yaml" % "snakeyaml" % "1.31",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import com.amazonaws.auth.{AWSCredentialsProvider, AWSCredentialsProviderChain,
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.gu.mediaservice.lib.logging.GridLogging
trait AwsClientBuilderUtils extends GridLogging {

trait AwsClientV1BuilderUtils extends GridLogging {
def awsLocalEndpoint: Option[String]
def isDev: Boolean

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.gu.mediaservice.lib.aws

import com.gu.mediaservice.lib.logging.GridLogging
import software.amazon.awssdk.auth.credentials.{AwsCredentialsProvider, DefaultCredentialsProvider}
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder
import software.amazon.awssdk.regions.Region

import java.net.URI

trait AwsClientV2BuilderUtils extends GridLogging {
def awsLocalEndpointUri: Option[URI]
def isDev: Boolean

def awsRegionV2: Region = Region.EU_WEST_1

def awsCredentialsV2: AwsCredentialsProvider = DefaultCredentialsProvider.builder().profileName("media-service").build()

final def withAWSCredentialsV2[T, S <: AwsClientBuilder[S, T]](builder: AwsClientBuilder[S, T], localstackAware: Boolean = true, maybeRegionOverride: Option[Region] = None): S = {
awsLocalEndpointUri match {
case Some(endpoint) if localstackAware => {
logger.info(s"creating aws client with local endpoint $endpoint")
builder.credentialsProvider(awsCredentialsV2).endpointOverride(endpoint)
}
case _ => builder.credentialsProvider(awsCredentialsV2).region(maybeRegionOverride.getOrElse(awsRegionV2))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ case class KinesisSenderConfig(
override val awsLocalEndpoint: Option[String],
override val isDev: Boolean,
streamName: String
) extends AwsClientBuilderUtils
) extends AwsClientV1BuilderUtils

class Kinesis(config: KinesisSenderConfig) extends GridLogging{

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package com.gu.mediaservice.lib.config

import com.gu.mediaservice.lib.aws.{AwsClientBuilderUtils, KinesisSenderConfig}
import com.gu.mediaservice.lib.aws.{AwsClientV1BuilderUtils, AwsClientV2BuilderUtils, KinesisSenderConfig}
import com.gu.mediaservice.model.UsageRightsSpec
import com.typesafe.config.{Config, ConfigException}
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import play.api.{ConfigLoader, Configuration}

import java.net.URI
import java.util.UUID
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.util.Try

abstract class CommonConfig(resources: GridConfigResources) extends AwsClientBuilderUtils with StrictLogging {
abstract class CommonConfig(resources: GridConfigResources) extends AwsClientV1BuilderUtils with AwsClientV2BuilderUtils with StrictLogging {
val configuration: Configuration = resources.configuration
final val stackName = "media-service"

Expand All @@ -25,6 +26,7 @@ abstract class CommonConfig(resources: GridConfigResources) extends AwsClientBui
override val awsRegion: String = stringDefault("aws.region", "eu-west-1")

override val awsLocalEndpoint: Option[String] = if(isDev) stringOpt("aws.local.endpoint").filter(_.nonEmpty) else None
override val awsLocalEndpointUri: Option[URI] = awsLocalEndpoint.map(new URI(_))

val useLocalAuth: Boolean = isDev && boolean("auth.useLocal")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,46 @@
package com.gu.mediaservice.lib.metadata

import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult
import com.gu.mediaservice.lib.aws.DynamoDB
import com.gu.mediaservice.lib.config.CommonConfig
import com.gu.mediaservice.model.ImageStatusRecord
import com.gu.scanamo._
import com.gu.scanamo.syntax._
import org.scanamo._
import org.scanamo.syntax._
import org.scanamo.generic.auto._
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient

import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter

class SoftDeletedMetadataTable(config: CommonConfig) extends DynamoDB[ImageStatusRecord](config, config.softDeletedMetadataTable) {
private val softDeletedMetadataTable = Table[ImageStatusRecord](table.getTableName)
class SoftDeletedMetadataTable(config: CommonConfig) {
val client = config.withAWSCredentialsV2(DynamoDbAsyncClient.builder()).build()

private val softDeletedMetadataTable = Table[ImageStatusRecord](config.softDeletedMetadataTable)

def getStatus(imageId: String)(implicit ex: ExecutionContext) = {
ScanamoAsync.exec(client)(softDeletedMetadataTable.get('id -> imageId))
ScanamoAsync(client).exec(softDeletedMetadataTable.get("id" === imageId))
}

def setStatus(imageStatus: ImageStatusRecord)(implicit ex: ExecutionContext) = {
ScanamoAsync.exec(client)(softDeletedMetadataTable.put(imageStatus))
ScanamoAsync(client).exec(softDeletedMetadataTable.put(imageStatus))
}

private def extractUnprocessedIds(results: List[BatchWriteItemResult]): List[String] =
results.flatMap(_.getUnprocessedItems.values().asScala.flatMap(_.asScala.map(_.getPutRequest.getItem.get("id").getS)))

def setStatuses(imageStatuses: Set[ImageStatusRecord])(implicit ex: ExecutionContext) = {
if (imageStatuses.isEmpty) Future.successful(List.empty)
else ScanamoAsync.exec(client)(softDeletedMetadataTable.putAll(imageStatuses)).map(extractUnprocessedIds)
def setStatuses(imageStatuses: Set[ImageStatusRecord])(implicit ex: ExecutionContext): Future[Unit] = {
if (imageStatuses.isEmpty) Future.successful(List.empty[String])
else ScanamoAsync(client).exec(softDeletedMetadataTable.putAll(imageStatuses))
}

def clearStatuses(imageIds: Set[String])(implicit ex: ExecutionContext) = {
if (imageIds.isEmpty) Future.successful(List.empty)
else ScanamoAsync.exec(client)(softDeletedMetadataTable.deleteAll('id -> imageIds)).map(extractUnprocessedIds)
else ScanamoAsync(client).exec(softDeletedMetadataTable.deleteAll("id" in imageIds))
}

def updateStatus(imageId: String, isDeleted: Boolean)(implicit ex: ExecutionContext) = {
val updateExpression = set('isDeleted -> isDeleted)
ScanamoAsync.exec(client)(
val updateExpression = set("isDeleted", isDeleted)
ScanamoAsync(client).exec(
softDeletedMetadataTable
.given(attributeExists('id))
.when(attributeExists("id"))
.update(
'id -> imageId,
key = "id" === imageId,
update = updateExpression
)
)
Expand Down
2 changes: 1 addition & 1 deletion image-loader/app/controllers/ImageLoaderController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.gu.mediaservice.lib.logging.{FALLBACK, LogMarker, MarkerMap}
import com.gu.mediaservice.lib.play.RequestLoggingFilter
import com.gu.mediaservice.lib.{DateTimeUtils, ImageIngestOperations}
import com.gu.mediaservice.model.{UnsupportedMimeTypeException, UploadInfo}
import com.gu.scanamo.error.{ConditionNotMet, ScanamoError}
import org.scanamo.{ConditionNotMet, ScanamoError}
import lib.FailureResponse.Response
import lib.imaging.{MimeTypeDetection, NoSuchImageExistsInS3, UserImageLoaderException}
import lib.storage.{ImageLoaderStore, S3FileDoesNotExistException}
Expand Down
7 changes: 2 additions & 5 deletions image-loader/app/controllers/UploadStatusController.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package controllers


import java.net.URI

import com.gu.mediaservice.lib.argo.ArgoHelpers
import com.gu.mediaservice.lib.auth.Permissions.UploadImages
import com.gu.mediaservice.lib.auth._
import com.gu.scanamo.error.{ConditionNotMet, DynamoReadError, ScanamoError}
import lib._
import model.{StatusType, UploadStatus}
import play.api.libs.json.Json
import org.scanamo.{ConditionNotMet, ScanamoError}
import play.api.mvc._

import java.net.URI
import scala.concurrent.{ExecutionContext, Future}

class UploadStatusController(auth: Authentication,
Expand Down
31 changes: 16 additions & 15 deletions image-loader/app/lib/UploadStatusTable.scala
Original file line number Diff line number Diff line change
@@ -1,50 +1,51 @@
package lib

import com.gu.mediaservice.lib.aws.DynamoDB
import com.gu.scanamo._
import com.gu.scanamo.error.DynamoReadError
import com.gu.scanamo.query.{AndCondition, AttributeExists, Condition, ConditionExpression, KeyEquals}
import com.gu.scanamo.syntax._
import org.scanamo._
import org.scanamo.syntax._
import org.scanamo.generic.auto._
import model.StatusType.{Prepared, Queued}
import model.{UploadStatus, UploadStatusRecord}
import software.amazon.awssdk.services.dynamodb.{DynamoDbAsyncClient, DynamoDbAsyncClientBuilder}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class UploadStatusTable(config: ImageLoaderConfig) extends DynamoDB(config, config.uploadStatusTable) {
class UploadStatusTable(config: ImageLoaderConfig) {

val client = config.withAWSCredentialsV2(DynamoDbAsyncClient.builder()).build()
val scanamo = ScanamoAsync(client)
private val uploadStatusTable = Table[UploadStatusRecord](config.uploadStatusTable)

def getStatus(imageId: String) = {
ScanamoAsync.exec(client)(uploadStatusTable.get('id -> imageId))
scanamo.exec(uploadStatusTable.get("id" === imageId))
}

def setStatus(uploadStatus: UploadStatusRecord) = {
ScanamoAsync.exec(client)(uploadStatusTable.put(uploadStatus))
scanamo.exec(uploadStatusTable.put(uploadStatus))
}

def updateStatus(imageId: String, updateRequest: UploadStatus) = {
val updateExpression = updateRequest.errorMessage match {
case Some(error) => set('status -> updateRequest.status) and set('errorMessages -> error)
case None => set('status -> updateRequest.status)
case Some(error) => set("status", updateRequest.status) and set("errorMessages", error)
case None => set("status", updateRequest.status)
}
val uploadStatusTableWithCondition =
if(updateRequest.status == Queued) // can only transition to Queued status from Prepared status
uploadStatusTable.given(attributeExists('id) and ('status -> Prepared.toString))
uploadStatusTable.when(attributeExists("id") and ("status" === Prepared.toString))
else
uploadStatusTable.given(attributeExists('id))
uploadStatusTable.when(attributeExists("id"))

ScanamoAsync.exec(client)(
scanamo.exec(
uploadStatusTableWithCondition
.update(
'id -> imageId,
key = "id" === imageId,
update = updateExpression
)
)
}

def queryByUser(user: String): Future[List[UploadStatusRecord]] = {
ScanamoAsync.exec(client)(uploadStatusTable.scan()).map {
scanamo.exec(uploadStatusTable.scan()).map {
case Nil => List.empty[UploadStatusRecord]
case recordsAndErrors => {
recordsAndErrors
Expand Down
60 changes: 30 additions & 30 deletions leases/app/controllers/MediaLeaseController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ class MediaLeaseController(auth: Authentication, store: LeaseStore, config: Leas
respond(appIndex, indexLinks)
}

private def clearLease(id: String) = store.get(id).map { lease =>
private def clearLease(id: String) = store.get(id).collect { case Some(lease) =>
store.delete(id).map { _ => notifications.sendRemoveLease(lease.mediaId, id)}
}
}.flatten

private def clearLeases(id: String) = Future.sequence(store.getForMedia(id)
.flatMap(_.id)
.flatten(clearLease))
private def clearLeases(id: String) = store.getForMedia(id).flatMap { leases =>
Future.sequence(leases.map(_.id).collect {
case Some(id) => clearLease(id)
})
}

private def badRequest(e: Seq[(JsPath, Seq[JsonValidationError])]) =
respondError(BadRequest, "media-leases-parse-failed", JsError.toJson(e).toString)
Expand All @@ -50,9 +52,11 @@ class MediaLeaseController(auth: Authentication, store: LeaseStore, config: Leas
private def addLease(mediaLease: MediaLease, userId: Option[String]) = {
val lease = prepareLeaseForSave(mediaLease, userId)
if (lease.isSyndication) {
val leasesForMedia = store.getForMedia(mediaLease.mediaId)
val leasesWithoutSyndication = leasesForMedia.filter(!_.isSyndication)
replaceLeases(leasesWithoutSyndication :+ lease, mediaLease.mediaId, userId)
for {
leasesForMedia <- store.getForMedia(mediaLease.mediaId)
leasesWithoutSyndication = leasesForMedia.filter(!_.isSyndication)
replacement <- replaceLeases(leasesWithoutSyndication :+ lease, mediaLease.mediaId, userId)
} yield replacement
} else {
store.put(lease).map { _ =>
notifications.sendAddLease(lease)
Expand All @@ -79,14 +83,16 @@ class MediaLeaseController(auth: Authentication, store: LeaseStore, config: Leas

def index = auth { _ => indexResponse }

def reindex = auth.async { _ => Future {
store.forEach { leases =>
leases
.foldLeft(Set[String]())((ids, lease) => ids + lease.mediaId)
.foreach(notifications.sendReindexLeases)
}
Accepted
}}
def reindex = auth.async { _ =>
for {
reindexRequests <- store.forEach { leases =>
leases
.foldLeft(Set[String]())((ids, lease) => ids + lease.mediaId)
.map(notifications.sendReindexLeases)
}
_ <- Future.sequence(reindexRequests)
} yield Accepted
}

def postLease = auth.async(parse.json) { implicit request =>
request.body.validate[MediaLease] match {
Expand All @@ -107,15 +113,12 @@ class MediaLeaseController(auth: Authentication, store: LeaseStore, config: Leas
}
}

def deleteLease(id: String) = auth.async { implicit request => Future {
clearLease(id)
Accepted
}
def deleteLease(id: String) = auth.async { implicit request =>
for { _ <- clearLease(id) } yield Accepted
}

def getLease(id: String) = auth.async { _ => Future {
val leases = store.get(id)

def getLease(id: String) = auth.async { _ =>
for { leases <- store.get(id) } yield {
leases.foldLeft(notFound)((_, lease) => respond[MediaLease](
uri = config.leaseUri(id),
data = lease,
Expand All @@ -127,10 +130,8 @@ class MediaLeaseController(auth: Authentication, store: LeaseStore, config: Leas
}


def deleteLeasesForMedia(id: String) = auth.async { _ => Future {
clearLeases(id)
Accepted
}
def deleteLeasesForMedia(id: String) = auth.async { _ =>
for { _ <- clearLeases(id) } yield Accepted
}

def validateLeases(leases: List[MediaLease]) = leases.count { _.isSyndication } <= 1
Expand All @@ -149,9 +150,8 @@ class MediaLeaseController(auth: Authentication, store: LeaseStore, config: Leas
)
}}

def getLeasesForMedia(id: String) = auth.async { _ => Future {
val leases = store.getForMedia(id)

def getLeasesForMedia(id: String) = auth.async { _ =>
for { leases <- store.getForMedia(id) } yield {
respond[LeasesByMedia](
uri = config.leasesMediaUri(id),
links = List(config.mediaApiLink(id)),
Expand Down
11 changes: 7 additions & 4 deletions leases/app/lib/LeaseNotifier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import com.gu.mediaservice.model.leases.MediaLease
import com.gu.mediaservice.syntax.MessageSubjects
import org.joda.time.DateTime

import scala.concurrent.ExecutionContext

class LeaseNotifier(config: LeasesConfig, store: LeaseStore) extends ThrallMessageSender(config.thrallKinesisStreamConfig) with MessageSubjects {
def sendReindexLeases(mediaId: String) = {
val leases = store.getForMedia(mediaId)
val updateMessage = UpdateMessage(subject = ReplaceImageLeases, leases = Some(leases), id = Some(mediaId) )
publish(updateMessage)
def sendReindexLeases(mediaId: String)(implicit ec: ExecutionContext) = {
for { leases <- store.getForMedia(mediaId) } yield {
val updateMessage = UpdateMessage(subject = ReplaceImageLeases, leases = Some(leases), id = Some(mediaId))
publish(updateMessage)
}
}

def sendAddLease(mediaLease: MediaLease) = {
Expand Down
Loading

0 comments on commit 427322f

Please sign in to comment.