專案概述
為寵物科技公司打造的 AI 智慧寵物攝影機,採用 ESP32-S3 雙核心晶片搭載 TensorFlow Lite,實現邊緣端寵物偵測、行為分析、異常告警等功能。產品整合 1080P 攝影機、零食拋投器、雙向語音、夜視功能,透過 WebRTC 低延遲視訊串流,讓主人隨時隨地與寵物互動。
已銷售 80,000+ 台,用戶平均每日使用時長 2.5 小時,寵物辨識準確率達 96.5%。
核心技術挑戰
1. 邊緣端 AI 寵物偵測
挑戰:
- ESP32-S3 記憶體有限(512KB SRAM + 8MB PSRAM)
- 需要即時處理 30fps 視訊流
- 模型需同時辨識多種寵物(貓、狗、兔子等)
解決方案 - YOLOv8-Nano 模型量化:
# 模型訓練與量化腳本(在 PC 端執行)
import tensorflow as tf
from ultralytics import YOLO
import numpy as np
# 1. 訓練 YOLOv8-Nano 模型(使用寵物資料集)
def train_pet_detection_model():
model = YOLO('yolov8n.pt') # YOLOv8-Nano 預訓練模型
# 訓練參數
results = model.train(
data='pet_dataset.yaml', # 自定義寵物資料集
epochs=100,
imgsz=320, # 降低解析度至 320x320(適合 ESP32)
batch=32,
device=0, # GPU 訓練
patience=20,
project='pet_detection',
name='yolov8n_pet'
)
# 導出為 TensorFlow Lite 格式
model.export(format='tflite', imgsz=320)
return 'yolov8n_pet.tflite'
# 2. 進階量化(INT8)
def quantize_model_int8(model_path, representative_dataset):
"""
將 FP32 模型量化為 INT8,減少模型大小和推理時間
"""
converter = tf.lite.TFLiteConverter.from_saved_model(model_path)
# 啟用完整 INT8 量化
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8
# 提供代表性資料集(用於校準量化參數)
def representative_data_gen():
for img in representative_dataset:
img_resized = tf.image.resize(img, [320, 320])
img_normalized = tf.cast(img_resized, tf.float32) / 255.0
yield [img_normalized[tf.newaxis, ...]]
converter.representative_dataset = representative_data_gen
# 執行量化
tflite_model = converter.convert()
# 儲存量化後的模型
with open('pet_detection_int8.tflite', 'wb') as f:
f.write(tflite_model)
print(f"Quantized model size: {len(tflite_model) / 1024:.2f} KB")
return 'pet_detection_int8.tflite'
# 3. 模型效能評估
def evaluate_model_performance(tflite_model_path, test_dataset):
interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
correct = 0
total = 0
inference_times = []
for img, label in test_dataset:
# 預處理
img_resized = tf.image.resize(img, [320, 320])
img_normalized = tf.cast(img_resized, tf.float32) / 255.0
input_data = np.expand_dims(img_normalized, axis=0).astype(np.float32)
# 推理
start_time = time.time()
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
inference_time = (time.time() - start_time) * 1000 # ms
inference_times.append(inference_time)
# 獲取結果
output_data = interpreter.get_tensor(output_details[0]['index'])
predicted_class = np.argmax(output_data)
if predicted_class == label:
correct += 1
total += 1
accuracy = correct / total * 100
avg_inference_time = np.mean(inference_times)
print(f"Accuracy: {accuracy:.2f}%")
print(f"Average inference time: {avg_inference_time:.2f} ms")
return accuracy, avg_inference_time
ESP32-S3 TensorFlow Lite 推理:
#include "tensorflow/lite/micro/all_ops_resolver.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/micro/micro_log.h"
#include "tensorflow/lite/micro/system_setup.h"
#include "tensorflow/lite/schema/schema_generated.h"
#define TAG "PET_DETECTION"
// 模型資料(嵌入韌體)
extern const unsigned char pet_detection_model[];
extern const unsigned int pet_detection_model_len;
// Tensor Arena(分配推理記憶體)
constexpr int kTensorArenaSize = 300 * 1024; // 300KB
alignas(16) uint8_t tensor_arena[kTensorArenaSize];
// 寵物類別標籤
const char* pet_labels[] = {
"dog", // 狗
"cat", // 貓
"rabbit", // 兔子
"bird", // 鳥
"hamster" // 倉鼠
};
typedef struct {
int class_id;
float confidence;
float bbox_x;
float bbox_y;
float bbox_w;
float bbox_h;
} detection_result_t;
class PetDetector {
private:
const tflite::Model* model;
tflite::MicroInterpreter* interpreter;
TfLiteTensor* input;
TfLiteTensor* output;
public:
PetDetector() {
// 載入模型
model = tflite::GetModel(pet_detection_model);
if (model->version() != TFLITE_SCHEMA_VERSION) {
ESP_LOGE(TAG, "Model schema version mismatch!");
return;
}
// 註冊所有操作
static tflite::AllOpsResolver resolver;
// 創建解釋器
static tflite::MicroInterpreter static_interpreter(
model, resolver, tensor_arena, kTensorArenaSize);
interpreter = &static_interpreter;
// 分配 Tensor 記憶體
TfLiteStatus allocate_status = interpreter->AllocateTensors();
if (allocate_status != kTfLiteOk) {
ESP_LOGE(TAG, "AllocateTensors() failed");
return;
}
// 獲取輸入/輸出 Tensor
input = interpreter->input(0);
output = interpreter->output(0);
ESP_LOGI(TAG, "Pet detection model loaded successfully");
ESP_LOGI(TAG, "Input shape: [%d, %d, %d, %d]",
input->dims->data[0], input->dims->data[1],
input->dims->data[2], input->dims->data[3]);
}
// 執行推理
detection_result_t detect(uint8_t* image_data, int width, int height) {
detection_result_t result = {0};
// 預處理:調整大小 + 正規化
preprocess_image(image_data, width, height, input->data.uint8);
// 執行推理
uint32_t start_time = esp_timer_get_time();
TfLiteStatus invoke_status = interpreter->Invoke();
uint32_t inference_time = (esp_timer_get_time() - start_time) / 1000; // ms
if (invoke_status != kTfLiteOk) {
ESP_LOGE(TAG, "Invoke failed!");
return result;
}
ESP_LOGI(TAG, "Inference time: %lu ms", inference_time);
// 解析輸出
result = parse_yolo_output(output);
if (result.confidence > 0.5) {
ESP_LOGI(TAG, "Detected: %s (%.2f%%)",
pet_labels[result.class_id],
result.confidence * 100);
}
return result;
}
private:
// 預處理圖像(調整大小 + 正規化)
void preprocess_image(uint8_t* src, int src_w, int src_h, uint8_t* dst) {
const int dst_w = 320;
const int dst_h = 320;
// 簡單的雙線性插值調整大小
for (int y = 0; y < dst_h; y++) {
for (int x = 0; x < dst_w; x++) {
int src_x = x * src_w / dst_w;
int src_y = y * src_h / dst_h;
// RGB 轉換(假設來源為 RGB565)
int src_idx = (src_y * src_w + src_x) * 2;
uint16_t rgb565 = (src[src_idx] << 8) | src[src_idx + 1];
uint8_t r = ((rgb565 >> 11) & 0x1F) << 3;
uint8_t g = ((rgb565 >> 5) & 0x3F) << 2;
uint8_t b = (rgb565 & 0x1F) << 3;
int dst_idx = (y * dst_w + x) * 3;
dst[dst_idx] = r;
dst[dst_idx + 1] = g;
dst[dst_idx + 2] = b;
}
}
}
// 解析 YOLO 輸出
detection_result_t parse_yolo_output(TfLiteTensor* output_tensor) {
detection_result_t best_result = {0};
float max_confidence = 0.0;
// YOLOv8 輸出格式:[1, 25200, 9]
// 9 = [x, y, w, h, conf, class_0, class_1, ..., class_4]
float* output_data = output_tensor->data.f;
int num_detections = output_tensor->dims->data[1];
for (int i = 0; i < num_detections; i++) {
float* detection = &output_data[i * 9];
float x = detection[0];
float y = detection[1];
float w = detection[2];
float h = detection[3];
float obj_conf = detection[4];
// 找出最高分數的類別
int best_class = 0;
float best_class_conf = detection[5];
for (int c = 1; c < 5; c++) {
if (detection[5 + c] > best_class_conf) {
best_class_conf = detection[5 + c];
best_class = c;
}
}
float confidence = obj_conf * best_class_conf;
if (confidence > max_confidence) {
max_confidence = confidence;
best_result.class_id = best_class;
best_result.confidence = confidence;
best_result.bbox_x = x;
best_result.bbox_y = y;
best_result.bbox_w = w;
best_result.bbox_h = h;
}
}
return best_result;
}
};
2. WebRTC 低延遲視訊串流
ESP32-S3 WebRTC 實作:
#include "esp_camera.h"
#include "esp_http_server.h"
#include "esp_websocket_server.h"
#define TAG "WEBRTC_STREAM"
// 攝影機配置(OV2640 1080P)
camera_config_t camera_config = {
.pin_pwdn = -1,
.pin_reset = -1,
.pin_xclk = 10,
.pin_sccb_sda = 40,
.pin_sccb_scl = 39,
.pin_d7 = 48,
.pin_d6 = 11,
.pin_d5 = 12,
.pin_d4 = 14,
.pin_d3 = 16,
.pin_d2 = 18,
.pin_d1 = 17,
.pin_d0 = 15,
.pin_vsync = 38,
.pin_href = 47,
.pin_pclk = 13,
.xclk_freq_hz = 20000000,
.ledc_timer = LEDC_TIMER_0,
.ledc_channel = LEDC_CHANNEL_0,
.pixel_format = PIXFORMAT_JPEG,
.frame_size = FRAMESIZE_HD, // 1280x720
.jpeg_quality = 12, // JPEG 品質(0-63,越小越好)
.fb_count = 2, // Frame buffer 數量
.grab_mode = CAMERA_GRAB_LATEST // 總是取最新幀
};
// WebSocket 客戶端管理
typedef struct {
httpd_handle_t server;
int fd;
bool connected;
uint32_t frame_count;
} webrtc_client_t;
static webrtc_client_t webrtc_clients[4] = {0};
// 初始化攝影機
esp_err_t init_camera(void) {
esp_err_t err = esp_camera_init(&camera_config);
if (err != ESP_OK) {
ESP_LOGE(TAG, "Camera init failed: %s", esp_err_to_name(err));
return err;
}
// 調整攝影機參數(夜視增強)
sensor_t *s = esp_camera_sensor_get();
s->set_brightness(s, 1); // 亮度 +1
s->set_contrast(s, 1); // 對比 +1
s->set_saturation(s, 0); // 飽和度 0
s->set_whitebal(s, 1); // 自動白平衡
s->set_awb_gain(s, 1); // 自動白平衡增益
s->set_exposure_ctrl(s, 1); // 自動曝光
s->set_aec2(s, 1); // 自動曝光 level 2
s->set_gain_ctrl(s, 1); // 自動增益
s->set_agc_gain(s, 10); // AGC 增益
ESP_LOGI(TAG, "Camera initialized successfully");
return ESP_OK;
}
// WebSocket 連線處理
esp_err_t webrtc_ws_handler(httpd_req_t *req) {
if (req->method == HTTP_GET) {
ESP_LOGI(TAG, "WebSocket handshake");
return ESP_OK;
}
// 找到空閒的客戶端插槽
webrtc_client_t *client = NULL;
for (int i = 0; i < 4; i++) {
if (!webrtc_clients[i].connected) {
client = &webrtc_clients[i];
client->server = req->handle;
client->fd = httpd_req_to_sockfd(req);
client->connected = true;
client->frame_count = 0;
break;
}
}
if (!client) {
ESP_LOGW(TAG, "Maximum WebRTC clients reached");
return ESP_FAIL;
}
ESP_LOGI(TAG, "WebRTC client connected: fd=%d", client->fd);
// 接收客戶端訊息(SDP Offer/ICE Candidate)
httpd_ws_frame_t ws_pkt;
memset(&ws_pkt, 0, sizeof(httpd_ws_frame_t));
ws_pkt.type = HTTPD_WS_TYPE_TEXT;
uint8_t buffer[1024];
ws_pkt.payload = buffer;
esp_err_t ret = httpd_ws_recv_frame(req, &ws_pkt, 1024);
if (ret != ESP_OK) {
client->connected = false;
return ret;
}
ESP_LOGI(TAG, "Received WebSocket message: %s", ws_pkt.payload);
// 處理 WebRTC 信令(SDP/ICE)
// 這裡簡化處理,實際需要完整的 WebRTC 協議處理
handle_webrtc_signaling(client, (char*)ws_pkt.payload, ws_pkt.len);
return ESP_OK;
}
// 視訊串流任務(FreeRTOS Task)
void webrtc_streaming_task(void *pvParameters) {
camera_fb_t *fb = NULL;
while (1) {
// 取得攝影機畫面
fb = esp_camera_fb_get();
if (!fb) {
ESP_LOGE(TAG, "Camera capture failed");
vTaskDelay(pdMS_TO_TICKS(100));
continue;
}
// 發送給所有連線的客戶端
for (int i = 0; i < 4; i++) {
if (!webrtc_clients[i].connected) continue;
httpd_ws_frame_t ws_frame;
memset(&ws_frame, 0, sizeof(httpd_ws_frame_t));
ws_frame.type = HTTPD_WS_TYPE_BINARY;
ws_frame.payload = fb->buf;
ws_frame.len = fb->len;
esp_err_t ret = httpd_ws_send_frame_async(
webrtc_clients[i].server,
webrtc_clients[i].fd,
&ws_frame
);
if (ret != ESP_OK) {
ESP_LOGW(TAG, "Client %d disconnected", i);
webrtc_clients[i].connected = false;
} else {
webrtc_clients[i].frame_count++;
}
}
// 釋放畫面緩衝
esp_camera_fb_return(fb);
// 控制幀率(30fps = 33ms)
vTaskDelay(pdMS_TO_TICKS(33));
}
}
3. 寵物行為分析與告警
行為辨識系統:
// Node.js 行為分析服務
const { InfluxDB, Point } = require('@influxdata/influxdb-client');
const mqtt = require('mqtt');
class PetBehaviorAnalyzer {
constructor() {
this.influxDB = new InfluxDB({
url: 'http://localhost:8086',
token: 'your-token'
});
this.writeApi = this.influxDB.getWriteApi('pet-monitor', 'behaviors');
this.queryApi = this.influxDB.getQueryApi('pet-monitor');
this.mqttClient = mqtt.connect('mqtt://localhost:1883');
this.behaviorHistory = [];
this.alertThresholds = {
prolonged_absence: 120, // 寵物消失 2 小時告警
excessive_barking: 5, // 5 分鐘內連續吠叫
abnormal_activity: 30 // 30 分鐘異常活動
};
this.initMQTT();
}
initMQTT() {
this.mqttClient.on('connect', () => {
this.mqttClient.subscribe('petcam/+/detection');
this.mqttClient.subscribe('petcam/+/audio');
});
this.mqttClient.on('message', (topic, message) => {
const data = JSON.parse(message.toString());
const cameraId = topic.split('/')[1];
if (topic.includes('detection')) {
this.analyzeDetection(cameraId, data);
} else if (topic.includes('audio')) {
this.analyzeAudio(cameraId, data);
}
});
}
// 分析寵物偵測結果
analyzeDetection(cameraId, detection) {
const point = new Point('pet_detection')
.tag('camera_id', cameraId)
.tag('pet_type', detection.class)
.floatField('confidence', detection.confidence)
.floatField('bbox_x', detection.bbox_x)
.floatField('bbox_y', detection.bbox_y)
.timestamp(new Date());
this.writeApi.writePoint(point);
// 記錄行為歷史
this.behaviorHistory.push({
timestamp: Date.now(),
cameraId,
type: 'detection',
data: detection
});
// 檢查異常行為
this.checkAbnormalBehaviors(cameraId);
}
// 分析音訊(吠叫偵測)
analyzeAudio(cameraId, audio) {
if (audio.barking_detected) {
const point = new Point('pet_audio')
.tag('camera_id', cameraId)
.tag('event_type', 'barking')
.floatField('volume', audio.volume)
.timestamp(new Date());
this.writeApi.writePoint(point);
// 檢查過度吠叫
this.checkExcessiveBarking(cameraId);
}
}
// 檢查異常行為
async checkAbnormalBehaviors(cameraId) {
// 1. 檢查寵物長時間消失
const lastDetection = await this.getLastDetectionTime(cameraId);
const timeSinceLastSeen = (Date.now() - lastDetection) / 1000 / 60; // 分鐘
if (timeSinceLastSeen > this.alertThresholds.prolonged_absence) {
this.sendAlert(cameraId, 'prolonged_absence', {
message: `您的寵物已經 ${Math.floor(timeSinceLastSeen)} 分鐘沒有出現在鏡頭前`,
severity: 'medium'
});
}
// 2. 檢查異常活動(頻繁移動/靜止不動)
const activityLevel = await this.calculateActivityLevel(cameraId, 30);
if (activityLevel > 0.8) {
this.sendAlert(cameraId, 'high_activity', {
message: '您的寵物可能過度興奮或焦慮',
severity: 'low'
});
} else if (activityLevel < 0.1) {
this.sendAlert(cameraId, 'low_activity', {
message: '您的寵物可能不太舒服,活動力明顯下降',
severity: 'medium'
});
}
}
// 檢查過度吠叫
async checkExcessiveBarking(cameraId) {
const fluxQuery = `
from(bucket: "behaviors")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "pet_audio")
|> filter(fn: (r) => r.camera_id == "${cameraId}")
|> filter(fn: (r) => r.event_type == "barking")
|> count()
`;
let barkingCount = 0;
await this.queryApi.queryRows(fluxQuery, {
next(row, tableMeta) {
const o = tableMeta.toObject(row);
barkingCount = o._value;
},
complete() {
if (barkingCount > 10) { // 5 分鐘內吠叫超過 10 次
this.sendAlert(cameraId, 'excessive_barking', {
message: '您的寵物可能感到焦慮或有訪客',
severity: 'medium',
count: barkingCount
});
}
}
});
}
// 計算活動力指標
async calculateActivityLevel(cameraId, minutes) {
const fluxQuery = `
from(bucket: "behaviors")
|> range(start: -${minutes}m)
|> filter(fn: (r) => r._measurement == "pet_detection")
|> filter(fn: (r) => r.camera_id == "${cameraId}")
|> derivative(unit: 1m, nonNegative: false)
|> mean()
`;
// 計算位置變化率(活動力)
return new Promise((resolve) => {
let activityLevel = 0.5; // 預設值
this.queryApi.queryRows(fluxQuery, {
next(row, tableMeta) {
const o = tableMeta.toObject(row);
activityLevel = Math.abs(o._value);
},
complete() {
resolve(activityLevel);
}
});
});
}
// 發送告警
sendAlert(cameraId, alertType, details) {
const alert = {
cameraId,
type: alertType,
timestamp: new Date().toISOString(),
...details
};
// 發送 MQTT 通知
this.mqttClient.publish(`petcam/${cameraId}/alerts`, JSON.stringify(alert));
// 發送推播通知(整合 Firebase Cloud Messaging)
this.sendPushNotification(cameraId, alert);
console.log(`Alert sent: ${alertType} for camera ${cameraId}`);
}
// 發送推播通知
async sendPushNotification(cameraId, alert) {
// 整合 Firebase Cloud Messaging
// 實際實作需要 FCM SDK
console.log(`Push notification: ${alert.message}`);
}
}
module.exports = PetBehaviorAnalyzer;
專案成果
技術指標
- ✅ 寵物辨識準確率:96.5%(經 10,000+ 張測試圖片驗證)
- ✅ 推理速度:150ms/frame(ESP32-S3@240MHz)
- ✅ 視訊串流延遲:< 300ms(WebRTC)
- ✅ 夜視距離:8 公尺(850nm 紅外線 LED)
- ✅ 零食拋投準確率:92%(配合 AI 定位)
- ✅ 電池續航:30 天待機(接收告警)/ 8 小時連續觀看
商業成果
- 📦 銷售數量:80,000+ 台
- ⏱️ 平均使用時長:2.5 小時/日
- ⭐ 用戶評分:4.8/5.0
- 🏆 獲得 2024 CES 創新獎(寵物科技類)
- 💰 月活躍訂閱用戶:25,000+(雲端錄影方案)
創新亮點
- 邊緣 AI 即時偵測:在裝置端完成寵物辨識,無需上傳雲端保護隱私
- 行為分析引擎:AI 學習寵物習慣,自動偵測異常行為
- 互動零食機:配合 AI 定位,精準拋投零食獎勵
- 雙向高清語音:降噪演算法,清晰與寵物對話
技術棧
硬體平台:
- ESP32-S3(Xtensa LX7 雙核心 240MHz)
- OV2640(200萬畫素攝影機)
- 紅外線夜視模組
- 步進馬達(零食拋投器)
- MEMS 麥克風 + 喇叭
邊緣 AI:
- TensorFlow Lite Micro
- YOLOv8-Nano(INT8 量化)
- EdgeTPU(可選加速器)
後端服務:
- Node.js + Express
- AWS IoT Core
- InfluxDB(行為數據)
- Firebase Cloud Messaging
前端應用:
- React Native(iOS/Android APP)
- WebRTC(即時視訊)
- React.js(Web 管理介面)
客戶回饋
"BASHCAT 的 AI 寵物攝影機徹底改變了寵物監控產品!邊緣 AI 不僅保護用戶隱私,還大幅降低了我們的雲端成本。行為分析功能讓產品更有溫度,用戶黏著度比同類產品高 40%。我們非常滿意這次合作!"
— CTO,寵物科技公司
專案時間:2023年3月 - 2024年1月 技術領域:邊緣 AI、電腦視覺、物聯網、即時通訊