Stream download for transcode, multipart upload for large output
This commit is contained in:
@@ -11,7 +11,8 @@ import {
|
|||||||
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
|
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
|
||||||
import { tmpdir } from "os";
|
import { tmpdir } from "os";
|
||||||
import { join } from "path";
|
import { join } from "path";
|
||||||
import { unlink, writeFile } from "fs/promises";
|
import { unlink } from "fs/promises";
|
||||||
|
import { createWriteStream, createReadStream } from "fs";
|
||||||
|
|
||||||
const RUSTFS_ENDPOINT = process.env.RUSTFS_ENDPOINT || "http://localhost:9000";
|
const RUSTFS_ENDPOINT = process.env.RUSTFS_ENDPOINT || "http://localhost:9000";
|
||||||
const RUSTFS_ACCESS_KEY = process.env.RUSTFS_ACCESS_KEY || "minioadmin";
|
const RUSTFS_ACCESS_KEY = process.env.RUSTFS_ACCESS_KEY || "minioadmin";
|
||||||
@@ -224,11 +225,11 @@ export function getStreamUrl(videoId: string): string {
|
|||||||
export async function transcodeVideo(originalKey: string): Promise<string> {
|
export async function transcodeVideo(originalKey: string): Promise<string> {
|
||||||
const s3 = getRustFsClient();
|
const s3 = getRustFsClient();
|
||||||
|
|
||||||
// Download original from S3 to temp file
|
|
||||||
const inputPath = join(tmpdir(), `transcode-input-${Date.now()}`);
|
const inputPath = join(tmpdir(), `transcode-input-${Date.now()}`);
|
||||||
const outputPath = join(tmpdir(), `transcode-output-${Date.now()}.mp4`);
|
const outputPath = join(tmpdir(), `transcode-output-${Date.now()}.mp4`);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Stream download from S3 to temp file (avoids loading whole file into RAM)
|
||||||
const getCmd = new GetObjectCommand({
|
const getCmd = new GetObjectCommand({
|
||||||
Bucket: RUSTFS_BUCKET,
|
Bucket: RUSTFS_BUCKET,
|
||||||
Key: originalKey,
|
Key: originalKey,
|
||||||
@@ -237,8 +238,33 @@ export async function transcodeVideo(originalKey: string): Promise<string> {
|
|||||||
if (!response.Body) {
|
if (!response.Body) {
|
||||||
throw new Error("Empty response body from RustFS");
|
throw new Error("Empty response body from RustFS");
|
||||||
}
|
}
|
||||||
const bytes = await response.Body.transformToByteArray();
|
|
||||||
await writeFile(inputPath, bytes);
|
const sdkStream = response.Body as any;
|
||||||
|
const webStream: ReadableStream<Uint8Array> =
|
||||||
|
sdkStream.transformToWebStream
|
||||||
|
? sdkStream.transformToWebStream()
|
||||||
|
: sdkStream;
|
||||||
|
|
||||||
|
const fileWriter = createWriteStream(inputPath);
|
||||||
|
const reader = webStream.getReader();
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
if (!fileWriter.write(value)) {
|
||||||
|
await new Promise<void>((resolve) =>
|
||||||
|
fileWriter.once("drain", resolve),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
fileWriter.end();
|
||||||
|
reader.releaseLock();
|
||||||
|
await new Promise<void>((resolve) =>
|
||||||
|
fileWriter.on("finish", resolve),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Run ffmpeg to transcode
|
// Run ffmpeg to transcode
|
||||||
const proc = Bun.spawn(
|
const proc = Bun.spawn(
|
||||||
@@ -280,10 +306,48 @@ export async function transcodeVideo(originalKey: string): Promise<string> {
|
|||||||
? originalKey.substring(0, lastDot) + ".mp4"
|
? originalKey.substring(0, lastDot) + ".mp4"
|
||||||
: originalKey + ".mp4";
|
: originalKey + ".mp4";
|
||||||
|
|
||||||
// Upload transcoded file to S3
|
// Upload transcoded file to S3 using multipart for large files
|
||||||
const file = Bun.file(outputPath);
|
const outputFile = Bun.file(outputPath);
|
||||||
const buffer = Buffer.from(await file.arrayBuffer());
|
const fileSize = outputFile.size;
|
||||||
await uploadObject(transcodedKey, buffer, "video/mp4");
|
const CHUNK_SIZE = 80 * 1024 * 1024; // 80MB chunks
|
||||||
|
|
||||||
|
if (fileSize < CHUNK_SIZE) {
|
||||||
|
// Small enough for single upload
|
||||||
|
const buffer = Buffer.from(await outputFile.arrayBuffer());
|
||||||
|
await uploadObject(transcodedKey, buffer, "video/mp4");
|
||||||
|
} else {
|
||||||
|
// Multipart upload for large files
|
||||||
|
const { uploadId } = await createMultipartUpload(
|
||||||
|
transcodedKey,
|
||||||
|
"video/mp4",
|
||||||
|
);
|
||||||
|
const numParts = Math.ceil(fileSize / CHUNK_SIZE);
|
||||||
|
const parts: { PartNumber: number; ETag: string }[] = [];
|
||||||
|
|
||||||
|
for (let i = 0; i < numParts; i++) {
|
||||||
|
const start = i * CHUNK_SIZE;
|
||||||
|
const end = Math.min(start + CHUNK_SIZE, fileSize);
|
||||||
|
const chunk = Buffer.from(
|
||||||
|
await outputFile.slice(start, end).arrayBuffer(),
|
||||||
|
);
|
||||||
|
|
||||||
|
const partNumber = i + 1;
|
||||||
|
const uploadCmd = new UploadPartCommand({
|
||||||
|
Bucket: RUSTFS_BUCKET,
|
||||||
|
Key: transcodedKey,
|
||||||
|
UploadId: uploadId,
|
||||||
|
PartNumber: partNumber,
|
||||||
|
Body: chunk,
|
||||||
|
});
|
||||||
|
const result = await s3.send(uploadCmd);
|
||||||
|
parts.push({
|
||||||
|
PartNumber: partNumber,
|
||||||
|
ETag: result.ETag!,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
await completeMultipartUpload(transcodedKey, uploadId, parts);
|
||||||
|
}
|
||||||
|
|
||||||
return transcodedKey;
|
return transcodedKey;
|
||||||
} finally {
|
} finally {
|
||||||
|
|||||||
Reference in New Issue
Block a user