Skip to content

Commit d9b242e

Browse files
authored
Merge pull request #286 from lensesio-dev/public-pr-1911
fix(azure-datalake): auto-create parent dirs on ADLS 404
2 parents f003b7b + 70c5f9a commit d9b242e

File tree

2 files changed

+234
-79
lines changed

2 files changed

+234
-79
lines changed

kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterface.scala

Lines changed: 122 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak
5252
extends StorageInterface[DatalakeFileMetadata]
5353
with LazyLogging {
5454

55+
private def parentDirectory(path: String): Option[String] = {
56+
val idx = path.lastIndexOf('/')
57+
if (idx > 0) Some(path.substring(0, idx)) else None
58+
}
59+
5560
override def list(
5661
bucket: String,
5762
prefix: Option[String],
@@ -172,37 +177,51 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak
172177

173178
override def uploadFile(source: UploadableFile, bucket: String, path: String): Either[UploadError, String] = {
174179
logger.debug(s"[{}] Uploading file from local {} to Data Lake {}:{}", connectorTaskId.show, source, bucket, path)
180+
def tryUploadFile(filePath: String, localFilePath: String): Either[Throwable, String] = Try {
181+
val createFileClient: DataLakeFileClient = createFile(bucket, filePath)
182+
val response = createFileClient.uploadFromFileWithResponse(
183+
localFilePath,
184+
new ParallelTransferOptions(),
185+
null, // PathHttpHeaders
186+
null, // Metadata
187+
new DataLakeRequestConditions(), // RequestConditions to avoid overwriting
188+
null, // Timeout
189+
null, // Context
190+
)
191+
response.getValue.getETag
192+
}.toEither
193+
175194
for {
176195
file <- source.validate.toEither
177-
eTag <- Try {
178-
val createFileClient: DataLakeFileClient = createFile(bucket, path)
179-
val response = createFileClient.uploadFromFileWithResponse(
180-
file.getPath,
181-
new ParallelTransferOptions(),
182-
null, // PathHttpHeaders
183-
null, // Metadata
184-
new DataLakeRequestConditions(), // RequestConditions to avoid overwriting
185-
null, // Timeout
186-
null, // Context
187-
)
188-
logger.debug(s"[{}] Completed upload from local {} to Data Lake {}:{}",
189-
connectorTaskId.show,
190-
source,
191-
bucket,
192-
path,
193-
)
194-
response.getValue.getETag
195-
}
196-
.toEither.leftMap { ex =>
196+
eTag <- tryUploadFile(path, file.getPath) match {
197+
case Right(tag) =>
198+
logger.debug(s"[{}] Completed upload from local {} to Data Lake {}:{}",
199+
connectorTaskId.show,
200+
source,
201+
bucket,
202+
path,
203+
)
204+
Right(tag)
205+
case Left(dse: DataLakeStorageException)
206+
if dse.getStatusCode == 404 || Option(dse.getMessage).exists(_.contains("PathNotFound")) =>
207+
parentDirectory(path) match {
208+
case Some(dir) =>
209+
createDirectoryIfNotExists(bucket, dir) match {
210+
case Left(err) => Left(UploadFailedError(err.exception, file))
211+
case Right(_) => tryUploadFile(path, file.getPath).leftMap(th => UploadFailedError(th, file))
212+
}
213+
case None => Left(UploadFailedError(dse, file))
214+
}
215+
case Left(other) =>
197216
logger.error(s"[{}] Failed upload from local {} to Data Lake {}:{}",
198217
connectorTaskId.show,
199218
source,
200219
bucket,
201220
path,
202-
ex,
221+
other,
203222
)
204-
UploadFailedError(ex, file)
205-
}
223+
Left(UploadFailedError(other, file))
224+
}
206225
} yield eTag
207226

208227
}
@@ -260,7 +279,7 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak
260279
maybeEtag: Option[String],
261280
): Either[FileMoveError, Unit] = {
262281
val conditions = maybeEtag.map(new DataLakeRequestConditions().setIfMatch(_))
263-
Try(
282+
def tryRenamePath(): Either[Throwable, Unit] = Try {
264283
client.getFileSystemClient(oldBucket).getFileClient(oldPath)
265284
.renameWithResponse(
266285
newBucket,
@@ -269,13 +288,47 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak
269288
null,
270289
null,
271290
Context.NONE,
272-
),
273-
).toEither.leftMap(
274-
FileMoveError(_, oldPath, newPath),
275-
).void
291+
)
292+
()
293+
}.toEither
294+
295+
tryRenamePath() match {
296+
case Right(_) => Right(())
297+
case Left(dse: DataLakeStorageException)
298+
if dse.getStatusCode == 404 || Option(dse.getMessage).exists(
299+
_.contains("RenameDestinationParentPathNotFound"),
300+
) =>
301+
parentDirectory(newPath) match {
302+
case Some(dir) =>
303+
createDirectoryIfNotExists(newBucket, dir) match {
304+
case Left(err) => Left(FileMoveError(err.exception, oldPath, newPath))
305+
case Right(_) => tryRenamePath().leftMap(th => FileMoveError(th, oldPath, newPath))
306+
}
307+
case None => Left(FileMoveError(dse, oldPath, newPath))
308+
}
309+
case Left(other) => Left(FileMoveError(other, oldPath, newPath))
310+
}
276311
}
277312

278-
override def createDirectoryIfNotExists(bucket: String, path: String): Either[FileCreateError, Unit] = ().asRight
313+
override def createDirectoryIfNotExists(bucket: String, path: String): Either[FileCreateError, Unit] = {
314+
// Create the directory path recursively
315+
val normalizedPath = Option(path).map(_.trim.stripPrefix("/").stripSuffix("/")).getOrElse("")
316+
if (normalizedPath.isEmpty) {
317+
().asRight
318+
} else {
319+
Try {
320+
val fsClient = client.getFileSystemClient(bucket)
321+
val segments = normalizedPath.split('/').toList.filter(_.nonEmpty)
322+
var current = ""
323+
segments.foreach { segment =>
324+
current = if (current.isEmpty) segment else s"$current/$segment"
325+
val dirClient = fsClient.getDirectoryClient(current)
326+
dirClient.createIfNotExists()
327+
()
328+
}
329+
}.toEither.leftMap(e => FileCreateError(e, normalizedPath)).void
330+
}
331+
}
279332

280333
override def getBlobAsStringAndEtag(bucket: String, path: String): Either[FileLoadError, (String, String)] =
281334
Try {
@@ -333,41 +386,48 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak
333386
case _ => requestConditions
334387
}
335388

336-
for {
337-
resp <- Try {
338-
val createFileClient: DataLakeFileClient = createFile(bucket, path)
339-
val bytes = content.getBytes
340-
Using.resource(new ByteArrayInputStream(bytes)) {
341-
bais =>
342-
createFileClient.append(bais, 0, bytes.length.toLong)
343-
}
344-
val position = bytes.length.toLong
345-
val pathHttpHeaders = new PathHttpHeaders()
346-
val retainUncommittedData = true
347-
val close = false // or true, if you want to finalize the file
348-
val context = Context.NONE
349-
350-
val response = createFileClient.flushWithResponse(
351-
position,
352-
retainUncommittedData,
353-
close,
354-
pathHttpHeaders,
355-
protection, // Correctly passed as DataLakeRequestConditions
356-
null, // Timeout duration remains optional
357-
context, // Context remains unchanged
358-
)
359-
389+
def tryWriteBlob(): Either[Throwable, String] = Try {
390+
val createFileClient: DataLakeFileClient = createFile(bucket, path)
391+
val bytes = content.getBytes
392+
Using.resource(new ByteArrayInputStream(bytes)) { bais =>
393+
createFileClient.append(bais, 0, bytes.length.toLong)
394+
}
395+
val position = bytes.length.toLong
396+
val pathHttpHeaders = new PathHttpHeaders()
397+
val retainUncommittedData = true
398+
val close = false // or true, if you want to finalize the file
399+
val context = Context.NONE
400+
val response = createFileClient.flushWithResponse(
401+
position,
402+
retainUncommittedData,
403+
close,
404+
pathHttpHeaders,
405+
protection,
406+
null,
407+
context,
408+
)
409+
response.getValue.getETag
410+
}.toEither
411+
412+
tryWriteBlob() match {
413+
case Right(eTag) =>
360414
logger.debug(
361415
s"[${connectorTaskId.show}] Completed upload from data string ($content) to datalake $bucket:$path",
362416
)
363-
response
364-
}.toEither.leftMap {
365-
ex =>
366-
logger.error(s"[{connectorTaskId.show}] Failed upload from data string ($content) to datalake $bucket:$path",
367-
ex,
368-
)
369-
FileCreateError(ex, content)
370-
}
371-
} yield new ObjectWithETag[O](objectProtection.wrappedObject, resp.getValue.getETag)
417+
Right(new ObjectWithETag[O](objectProtection.wrappedObject, eTag))
418+
case Left(dse: DataLakeStorageException)
419+
if dse.getStatusCode == 404 || Option(dse.getMessage).exists(_.contains("PathNotFound")) =>
420+
parentDirectory(path) match {
421+
case Some(dir) =>
422+
createDirectoryIfNotExists(bucket, dir) match {
423+
case Left(err) => Left(FileCreateError(err.exception, content))
424+
case Right(_) => tryWriteBlob().leftMap(ex => FileCreateError(ex, content)).map(et =>
425+
new ObjectWithETag[O](objectProtection.wrappedObject, et),
426+
)
427+
}
428+
case None => Left(FileCreateError(dse, content))
429+
}
430+
case Left(other) => Left(FileCreateError(other, content))
431+
}
372432
}
373433
}

0 commit comments

Comments
 (0)