Criar um microsserviço de preços dinâmicos com Vertex AI e MongoDB Atlas
Francesco Baldissera, Sebastian Rojas Arbulu18 min read • Published Oct 09, 2024 • Updated Oct 09, 2024
Avalie esse Tutorial
No mundo hipercompetitivo do comércio eletrônico, elaborar uma estratégia de preços vencedora é essencial para o crescimento. Felizmente, o big data e o aprendizado de máquina revolucionaram os preços. As empresas agora podem aproveitar o comportamento do cliente em tempo real e os dados da concorrência para ajustar dinamicamente os preços.
Este tutorial mergulha na criação de um microsserviço de preços dinâmicos responsivo que permite que os preços sejam ajustados em tempo real para máxima eficácia. Exploraremos o uso doMongoDB Atlas para seu armazenamento e gerenciamento eficientes de dados, aproveitando o poderdo Google Cloud Platform (GCP) para hospedagem e cálculos complexos. No final, você estará preparado para implementar essa abordagem e liberar o potencial dos preços orientados por dados.
A animação a seguir ilustra o que pretendemos alcançar:
Como visto, esta loja de e-commerce exibe um preço previsto dinamicamente junto com o preço real do produto. O preço previsto é calculado em tempo real usando algoritmos de aprendizado de máquina que analisam as tendências do mercado, a demanda, os preços da concorrência, o comportamento do cliente e os dados de vendas para otimizar as vendas e o lucro.
Antes de começar, vamos estabelecer contexto com uma visão geral do nosso modelo de dados. Nosso microsserviço aproveita a MongoDB Atlas, uma plataforma de dados para desenvolvedores, para alimentar a AI em tempo real em nosso aplicativo de e-commerce. O Atlas armazena nossas funcionalidades de ML em duas collections de chaves, atuando como um armazenamento de funcionalidades. Isso simplifica o gerenciamento de dados, automatiza a tomada de decisões e isola as cargas de trabalho. Com change streams e Atlas Triggers, as atualizações fluem perfeitamente para nossos modelos de AI, minimizando a sobrecarga operacional para empresas e MLOps.
A collection Product no MongoDB Atlas é organizada usando o padrão polimórfico. O padrão polimórfico é útil quando queremos acessar (consultar) informações de uma única coleção. O agrupamento de documentos com base nas consultas que queremos executar (em vez de separar o objeto em tabelas ou coleções) ajuda a melhorar o desempenho. Ao centralizar vários tipos de produtos, esse padrão simplifica o gerenciamento de dados e melhora a eficiência da consulta. A tabela a seguir descreve os principais campos dentro da collection Produtos (não pública), que você usará para criar uma collection no MongoDB Atlas de acordo com este esquema. A tabela inclui os tipos de dados e breves descrições de cada campo:
Nome do campo | Tipo de Dados | Descrição |
---|---|---|
_id | ObjectId | Identificador único do documento; usado pelo MongoDB para fins internos |
Nome | String | Nome do produto |
código | String | Código único que identifica o produto |
reabastecimento automático | Boolean | Indica se o produto está configurado para substituição automática |
id | Inteiro | Identificador numérico do produto; identificador numérico para referência externa |
gênero | String | Categoria de gênero a que o produto se destina |
masterCategory | String | Categoria ampla para o produto |
subCategoria | String | Categoria mais específica na categoria principal |
articleType | String | Tipo de artigo, por exemplo, Colete |
baseColour | String | Cor primária do produto |
temporada | String | Época para a qual o produto é destinado |
Ano | Inteiro | Ano do lançamento do produto |
Uso | String | Uso pretendido do produto, por exemplo, Casual |
Imagem | Objeto | Contém o URL da imagem do produto |
Preço | Objeto | Contém o valor e a moeda do preço do produto; estrutura aninhada |
Descrição | String | Descrição detalhada do produto |
marca | String | Marca do produto |
items | Array de objetos | Contém variantes do produto, incluindo tamanho, informações de estoque e prazo de entrega |
total_stock_sum | Array de objetos | Informações de estoque agregadas em diferentes locais |
pred_price | Duplo | Preço previsto do produto com base em modelos de machine learning; usa um tipo de dados duplo para precisão nas projeções de preços |
Os objetos JSON na collection devem aparecer da seguinte forma:
A utilização da coleção de eventos do MongoDB como um armazenamento de recursos de ML — um repositório centralizado projetado para simplificar o gerenciamento e a entrega de recursos usados em modelos de aprendizado de máquina — oferece diversas vantagens. Isso significa que seu armazenamento de recursos está sempre acessível, reduzindo o tempo de inatividade e melhorando a eficiência das operações de aprendizado de máquina. Por outro lado, os sistemas entre regiões melhoram ainda mais o desempenho ao aproximar as funcionalidades dos modelos que as utilizam. Isso reduz a latência, permitindo um treinamento e serviço do modelo mais rápidos.
Essa collection (não pública) armazenada no MongoDB Atlas serve como repositório para eventos de comportamento do usuário cruciais para treinar nosso modelo de predição de preços. Os campos principais que você usará para criar a collection, de acordo com este esquema, são os seguintes:
Campo | Tipo de Dados | Descrição | Valores de exemplo |
---|---|---|---|
user_name | String | O nome do produto | "MongoDB Notebook" |
product_id | Inteiro | Identificador exclusivo do produto | 98803 |
ação | String | Tipo de ação realizada no produto (interação com o usuário) | "view", "add_to_cart", "purchase" |
Preço | Float | Preço do produto | 18.99 |
timestamp | String | Carimbo de data/hora no formato ISO de quando o evento ocorreu | "2024-03-25T12:36:25.428461" |
encoded_name | Inteiro | Uma versão codificada do nome do produto para modelos de machine learning | 23363195 |
tensor | Array | Uma representação numérica do produto extraído por meio de técnicas de aprendizado de máquina; o tamanho do tensor pode variar dependendo dos requisitos específicos do modelo | [0.0005624396083488047, -0.9579731008383453] |
Um objeto Events, com seus tensores associados, deve ser semelhante a:
Além disso, é importante considerar que nossa solução incorpora vários componentes para facilitar a precificação dinâmica:
- Ingestão de dados: OPub/Sub atua como um pipeline de alta velocidade, introduzindo de forma eficiente grandes quantidades de dados de comportamento do cliente formatados como JSON.
- Processamento de dados: O Vertex AI Workbench oferece um ambiente limpo para a limpeza de dados e o treinamento de modelos TensorFlow. Esses modelos analisam eventos de clientes, nomes de produtos e preços existentes para prever o preço ideal para cada item.
- Armazenamento de funcionalidades: O MongoDB Atlas serve como um hub central para todas as funcionalidades usadas pelo modelo. Isso garante consistência entre os dados usados para treinamento e os dados usados para predições em tempo real, bem como os dados operacionais para seus aplicativos, reduzindo assim a sobrecarga de "in-app analytics. " Isso também simplifica o processo geral mantendo tudo em um lugar.
- Orquestração do modelo: Cloud Functions agem como um condutor, direcionando o fluxo de dados de eventos do cliente. Eles pegam os dados do Pub/Sub, transformam em um formato utilizável pelo modelo (tensores) e os armazenam no MongoDB Atlas. Isso permite que o modelo acesse facilmente as informações de que precisa.
A arquitetura foi projetada para aprimorar estratégias de preços por meio de aprendizado profundo e otimização contínua do modelo.
- Ingestão de eventos: os dados do evento do cliente são inseridos em um tópico do Google Cloud Pub/Sub, servindo como ponto de entrada para dados em temporeal.
- Processamento de dados: uma função Cloud é acionada por meio de uma assinatura push do tópico Pub/Sub. Essa função transforma dados brutos de eventos em um formato de tensor estruturado.
- Invocação de modelo e atualização de preço: a mesma função do cloud chama um endpoint de modelo implantado (por exemplo, no Vertex AI) com os dados do tensor para prever o preço. Em seguida, ela atualiza o preço previsto na collection do catálogo de produtos do MongoDB.
- Atualização do repositório de recursos: ao mesmo tempo, a função Cloud envia os dados do tensor para a coleção MongoDB Events, que atua como um repositório de recursos. Cada evento tem seu próprio tensor.
- Controle de versão e acessibilidade: No momento, os dados do armazenamento de recursos não têm controle de versão. O padrão de controle de versão é útil para um armazenamento futuro porque aborda o problema de querer manter as revisões mais antigas dos dados no MongoDB, evitando a necessidade de um sistema de gerenciamento separado.
Importante: certifique-se de verificar o guia de versionamento de padrões. O controle de versão em um feature store melhora a reprodutibilidade, rastreabilidade, colaboração e conformidade em fluxos de trabalho MLOps, tornando-o um componente essencial para o gerenciamento eficaz de pipelines de aprendizado de máquina.
Vamos começar a criar seu microsserviço de preços de AI! Para integrar perfeitamente os preços de AI em seu aplicativo, você precisará configurar os seguintes componentes:
- Conta do MongoDB Atlas: Configure um cluster, configure as configurações de segurança e conecte seu aplicativo.
- Conta do Google Cloud Platform: crie um projeto, habilite as APIs necessárias (por exemplo, Cloud Storage, Cloud Function, Pub/Sub, Vertex AI) e configure a CLI.
- Instalar Node.js e Express: Clone o repositório que contém o código do microsserviço para este tutorial, configure variáveis de ambiente e desenvolva a lógica de preços.
- Criar um cluster: entre na sua conta do MongoDB Atlas e crie um novo cluster. Escolha uma região mais próxima da sua base de usuários para obter o desempenho ideal.
- Configurar segurança: Configure as configuraçõesde segurança do seu cluster. Crie usuários do banco de dados com roles específicos e habilite a lista branca de IP para proteger sua conexão do banco de dados.
- Conecte-se ao seu cluster: use a connection string fornecida pelo Atlas para conectar seu aplicativo ao MongoDB database. Você precisará disso na configuração do microsserviço.
- Criar um projeto GCP: Faça login no console do Google Cloud e crie um novo projeto para seu microsserviço.
- Habilitar APIs: certifique-se de que as APIs necessárias estejam habilitadas para seu projeto. Neste microsserviço, estamos utilizando os serviços abaixo.
Dica: Verifique em "IAM & Admin " que você possui todas as permissões necessárias. Verifique se você tem privilégios de proprietário ou acesso granular para usuários específicos (com regras menos permissivas). Além disso, siga este guia para habilitar qualquer API e serviço no GCP.
Serviços | Explicação |
---|---|
Armazenamento em nuvem | Salvando dimensionadores de dados como arquivos {.joblib |
Função do Cloud | Orquestrando o fluxo de dados |
Pub/Sub | Ingestão de transmissão ao vivo de eventos de e-commerce para acoplar livremente microsserviços baseados em assinatura |
VertexAI | Notebook de treinamento e endpoint do modelo |
Armazenamento em nuvem:
- Pesquise na barra de pesquisa do Google Cloud Platform por "Cloud Storage"
- Você deve ver a seguinte tela indicando que o serviço está ativado. Se você for solicitado a ativar o serviço, faça isso.
Funções do Cloud Run:
- Pesquise na barra de pesquisa do Google Cloud Platform por "Cloud Run functions"
- Você deve ver a seguinte tela indicando que o serviço está ativado. Se você for solicitado a ativar o serviço, faça isso.
Pub/Sub:
- Pesquise na barra de pesquisa do Google Cloud Platform por "Pub/Sub"
- Você deve ver a seguinte tela indicando que o serviço está ativado. Se você for solicitado a ativar o serviço, faça isso.
VertexAI:
- Pesquise na barra de pesquisa do Google Cloud Platform por "VertexAI"
- Você deve ver a seguinte tela indicando que o serviço está ativado. Se você for solicitado a ativar o serviço, faça isso.
- Configurar CLI do GCP: instale e inicialize o CLI do Google Cloud. Autentique-se com sua conta GCP e defina seu projeto como padrão.
- Clonar o repositório: comece clonando o repositório com o código do microsserviço.
Abra seu terminal e execute os seguintes comandos:
1 git clone https://github.com/mongodb-industry-solutions/retail-store.git
Em seguida, navegue até o diretório que contém o microsserviço de preços dinâmicos:
1 cd retail-store/microservices/dynamicPricing
- Configurar pacotes Python: Quando estiver no diretório correto, digite o seguinte comando e pressione Enter:
1 pip install -r requirements.txt
- Configurar variáveis de ambiente: configure as variáveis de ambiente necessárias, incluindo sua connection string do MongoDB Atlas e quaisquer outras configurações específicas do serviço. As variáveis de ambiente são essenciais para gerenciar as definições de configuração, especialmente aquelas que contêm informações confidenciais.
Aqui está um modelo ilustrando como configurar variáveis de ambiente para um microsserviço que se conecta ao MongoDB Atlas:
Criando o
.env
arquivo com o seguinte conteúdo:1 MONGODB_URI=mongodb+srv://username:password@clusterName.mongodb.net/ 2 GOOGLE_APPLICATION_CREDENTIALS=my-google-credentials 3 GOOGLE_CLOUD_PROJECT=my-google-cloud 4 PUBSUB_TOPIC_ID=my-topic-id
Dica: substitua
username
e password
pelo seu nome de usuário e senha do MongoDB, clusterName
pelo nome do cluster MongoDB, my-google-credentials
pelo caminho para o arquivo de credenciais do aplicativo Google, my-google-cloud
pelo ID do projeto do Google Cloud e my-topic-id
com a ID do seu tópico do Google Cloud Pub/Sub.Configurar o tópico do Pub/Sub:
- Navegue até Pub/Sub em seu projeto do Google Cloud Platform.
- Forneça um nome exclusivo para o seu tópico no campo "ID do Tópico".
- Ajustar as configurações do tópico
- Selecione o método de criptografia que você prefere
- Finalize o processo clicando no botãoCriar
Desenvolva a lógica de precificação: modifique o serviço dynamicPricing para implementar seu algoritmo de precificação. Isso pode envolver a análise de dados históricos, considerando os preços dos concorrentes e integrando sinais de oferta e demanda em tempo real.
- Clique em +Create para criar um novo notebook.
Conecte-se diretamente ao seu cluster MongoDB para análise de dados ao vivo e treinamento de algoritmos: depois de criar um novo notebook, você pode usá-lo para se conectar diretamente ao seu cluster MongoDB usandoos seguintes trechos:
1 conn_string = "your_mongodb_connection_string_here" 2 client = MongoClient(conn_string)
1 db = client["your_database_name"] 2 collection = db["your_collection_name"]
Dica: substitua
"your_mongodb_connection_string_here"
por sua connection string real do MongoDB. Isso normalmente segue um formato semelhante a: mongodb+srv://<username>:<password>@<cluster-address>/<database-name>?retryWrites=true&w=majority
Certifique-se de substituir: <username>
e <password>
por suas credenciais do MongoDB. <cluster-address>
pelo endereço do seu cluster MongoDB. <database-name>
(opcional) com o banco de dados específico ao qual você deseja se conectar dentro do seu cluster. Além disso, substitua "your_database_name"
e "your_collection_name"
pelos nomes reais que você está usando na configuração do MongoDB.Isso permitirá que você extraia dados de seus clusters ao vivo e treine um algoritmo de preços. Nesse caso, usamos oTensorFlow para capturar como os preços mudam com base no comportamento do usuário.
Treine um modelo de rede neural do TensorFlow: agora que estamos conectados ao MongoDB, mostraremos um Jupyter Notebook projetado para uma loja de comércio eletrônico, semelhante ao da introdução. Sinta-se à vontade para modificá-lo de acordo com suas necessidades específicas. Este notebook demonstra como treinar um modelo de rede neural TensorFlow para prever preços ideais com base em eventos de comércio eletrônico armazenados em uma loja de recursos do MongoDB Atlas. Vamos começar.
Decidimos que a loja de comércio eletrônico tem o seguinte modelo de dados para capturar eventos de comportamento do usuário:
Campo | Tipo de Dados | Descrição | Valores de exemplo |
---|---|---|---|
user_name | String | O nome do produto | "MongoDB Notebook" |
product_id | Inteiro | Identificador exclusivo do produto | 98803 |
ação | String | Tipo de ação realizada no produto (interação com o usuário) | "view", "add_to_cart", "purchase" |
Preço | Float | Preço do produto | 18.99 |
timestamp | String | Carimbo de data/hora no formato ISO de quando o evento ocorreu | "2024-03-25T12:36:25.428461" |
encoded_name | Inteiro | Uma versão codificada do nome do produto para modelos de machine learning | 23363195 |
Esta tabela pressupõe que o campo product["price"] é um float que representa o preço do produto em uma única moeda (por exemplo, USD). O campo encoded_name é considerado um número inteiro, que pode representar um hash ou uma codificação usada para transformar o nome do produto em um formato numérico adequado para modelos de aprendizado de máquina. O campo de carimbo de data/hora é uma string formatada como carimbo de data/hora ISO, que fornece a data e a hora exatas em que a ação foi registrada. Os valores de exemplo são espaços reservados e devem ser substituídos por dados reais do seu aplicativo.
Configurando uma conexão MongoDB com Python: Primeiro, precisamos instalar os pacotes Python necessários e estabelecer uma conexão com nosso MongoDB database.
1 ```python 2 !pip install pymongo 3 !pip install 'pymongo[srv]' 4 !pip install pandas 5 from pymongo import MongoClient 6 import pandas as pd 7 import keras
1 # Replace the below connection string with your MongoDB connection URI 2 conn_string = "your_mongodb_connection_string_here" 3 client = MongoClient(conn_string)
1 # Specify the database and collection 2 db = client["your_database_name"] 3 collection = db["your_collection_name"]
Limpeza de dados: uma vez conectados, buscaremos os dados e executaremos algumas operações básicas de limpeza para prepará-los para o treinamento do modelo.
1 # Get all the documents 2 documents = collection.find() 3 4 # Convert the documents into a list and then into a DataFrame 5 df = pd.DataFrame(list(documents)) 6 7 # Drop unnecessary columns 8 df = df.drop(columns=['product_name', 'product_id', 'timestamp', 'tensor']) 9 10 # Extracting the 'amount' from the 'price' column and converting it to float 11 df['price'] = df['price'].apply(lambda x: float(x['amount']) if isinstance(x, dict) and 'amount' in x else None) 12 13 df = df.dropna()
Construindo o modelo de precificação dinâmica: Em seguida, importamos as bibliotecas necessárias do TensorFlow e do scikit-learn, codificamos variáveis categóricas e normalizamos nossos dados.
1 import tensorflow as tf 2 from sklearn.model_selection import train_test_split 3 from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler 4 from tensorflow.keras.models import Sequential 5 from tensorflow.keras.layers import Dense 6 from tensorflow.keras.layers import Dropout
1 # Encode categorical variables 2 label_encoders = {} 3 for column in ['action', 'encoded_name']: 4 le = LabelEncoder() 5 df[column] = le.fit_transform(df[column]) 6 label_encoders[column] = le 7 8 df.head()
1 # Standardizing 2 scaler = StandardScaler() 3 df[['action', 'encoded_name']] = scaler.fit_transform(df[['action', 'encoded_name']]) 4 5 df.head()
Salvando o codificador para pré-processamento de dados de eventos: salvaremos os objetos do codificador no Google Cloud Storage para uso posterior no pré-processamento de novos dados para previsões. Esse código gerará arquivos joblib para salvar os critérios de codificação e padronização do pré-processamento acima e do treinamento futuro.
1 !pip install google-cloud-storage 2 from google.cloud import storage 3 import joblib 4 import io
1 # Initialize a client 2 storage_client = storage.Client() 3 4 # The name of your GCP bucket 5 bucket_name = 'dyn_pricing_scaler' 6 7 # The path within your bucket to save the scaler object 8 destination_blob_name = 'labelEncoder.joblib' 9 10 # Create a buffer 11 buffer = io.BytesIO() 12 13 # Dump the scaler object to the buffer 14 joblib.dump(label_encoders, buffer) 15 16 # Now upload the buffer content to GCS 17 bucket = storage_client.bucket(bucket_name) 18 blob = bucket.blob(destination_blob_name) 19 20 # Rewind the buffer's file pointer to the beginning of the file 21 buffer.seek(0) 22 23 # Upload the contents of the buffer 24 blob.upload_from_file(buffer, content_type='application/octet-stream') 25 26 print(f"Uploaded scaler to gs://{bucket_name}/{destination_blob_name}")
Treinamento do modelo: com nossos dados preparados, vamos dividi-los em conjuntos de treinamento e teste, definir nossa arquitetura de rede neural e treinar nosso modelo. Lembre-se de que este é um modelo destinado a uma demonstração simples.
1 from tensorflow.keras.models import Sequential 2 from tensorflow.keras.layers import Dense 3 from tensorflow.keras.optimizers import Adam 4 5 # Splitting data into training and testing sets 6 X = df.drop('price', axis=1) 7 y = df['price'] 8 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) 9 10 # Define the model 11 model = Sequential([ 12 Dense(64, activation='relu', input_shape=(X_train.shape[1],)), 13 Dense(64, activation='relu'), 14 Dense(1) # Output layer for regression 15 ]) 16 17 # Compile the model 18 model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae']) 19 20 # Print the model summary to check the architecture 21 model.summary() 22 23 # Train the model 24 history = model.fit(X_train, y_train, 25 validation_split=0.2, # Further split the training set for validation 26 epochs=10, # Number of epochs to train for 27 verbose=1, # Show training output 28 ) 29 30 # Evaluate the model on the test set 31 test_loss, test_mae = model.evaluate(X_test, y_test, verbose=1) 32 print(f"Test Loss: {test_loss}, Test MAE: {test_mae}")
Previsão de teste: após o treinamento, fazemos uma previsão de teste para verificar o desempenho do modelo.
1 import numpy as np 2 3 # Example new data point converted into a tensor 4 new_data = np.array([[1.225999, -0.957973]]) 5 6 predicted_price = model.predict(new_data) 7 print("Predicted Price:", predicted_price[0])
Salvando o modelo: Por fim, salvaremos nosso modelo treinado no Google Cloud Storage.
1 from google.colab import auth 2 auth.authenticate_user() 3 4 project_id = 'your-gcp-project-id' 5 !gcloud config set project {project_id} 6 7 model_dir = 'your-model-directory' 8 9 # Save the model to GCS 10 model_dir = f'gs://{bucket_name}/{model_dir} 11 model.save(model_dir) 12 13 bucket_name = 'your-cloud-storage-bucket-name' 14 model_dir = 'your-model-directory' 15 model_path = 'your-model-path' 16 !gsutil mb -l us-central1 gs://{bucket_name} # Create the bucket if necessary 17 !gsutil cp -r {model_dir} gs://{bucket_name}/{model_path}
Registrando o modelo na Vertex AI: Em seguida, registraremos nosso modelo treinamento no registro demodelo da VertexAI:
1 from google.cloud import aiplatform 2 3 aiplatform.init(project='your-gcp-project-id', location='your-gcp-region') 4 5 #Model registry 6 7 model_display_name = 'dyn_pricingv1' 8 model_description = 'TensorFlow dynamic pricing model' 9 bucket_name = 'your-gcp-bucket-name' 10 model_path = 'your-model-path' 11 12 model = aiplatform.Model.upload( 13 display_name=model_display_name, 14 artifact_uri=f'gs://{bucket_name}/{model_path}', 15 serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest', 16 description=model_description, 17 )
Parabéns! Agora você deve ver seu modelo listado no Registro de modelo da Vertex AI. É aqui que você gerenciará e implantará seus modelos para vários aplicativos. Agora, precisamos treinar um modelo de rede causal do TensorFlow para prever um preço ideal com base em eventos de e-commerce armazenados em uma loja de recursos do MongoDB Atlas.
Você deve implantar um modelo em um endpoint antes que esse modelo possa ser usado para servir previsões online. A implantação de um modelo associa recursos físicos ao modelo para que ele possa servir previsões online com baixa latência. Aqui estão as etapas necessárias:
- No console do GCP, na seção AI, acesse a páginaModelos.
- Clique no nome e ID da versão do modelo que deseja implantar para abrir a página de detalhes (modelo da última etapa).
- Selecione a aba Implementar e testar .
- Clique em Implementar no endpoint.
- Preencha o restante dos parâmetros (Configurações do modelo, Monitoramento do modelo).
- Clique em Deploy.
Em seguida, no painel Vertex AI:
- Clique em Endpoints e selecione seu modelo implantado.
- Obtenha o ID do endpoint na página de detalhes, pois você precisará dele para configurar o Cloud Function para enviar solicitações de predição para esse endpoint.
configuração do cloudFunction: O Google CloudFunction orquestrará a conversão de dados de eventos em tensores e sua entrada na coleção do armazenamento de recursos, além de invocar o modelo de endpoint do VertexAI. Siga estas etapas:
Navegue até Cloud Functions em seu projeto GCP e clique em CREATE FUNCTION.
Certifique-se de que o trigger para sua função de cloud seja o tópico Pub/Sub criado anteriormente.
No arquivo
main.py
visto abaixo, copie e cole o seguinte trecho de código Python:1 from pymongo import MongoClient 2 from datetime import datetime, timedelta 3 import json 4 import numpy as np 5 import pandas as pd 6 import joblib 7 import os 8 from google.cloud import storage 9 import functions_framework 10 import base64 11 from bson import ObjectId 12 from google.cloud import aiplatform 13 14 @functions_framework.cloud_event 15 def hello_pubsub(cloud_event): 16 event_data = base64.b64decode(cloud_event.data["message"]["data"]) 17 print(event_data) 18 event_data = json.loads(event_data) 19 print(event_data) 20 event_data = pd.DataFrame([event_data]) 21 print(event_data) 22 23 # Correctly handle the '_id' field when it's a string, getting the event_id and product_id data 24 event_id = ObjectId(str(event_data['_id'].loc[0])) 25 print(event_id) 26 product_id = event_data['product_id'].loc[0] 27 print(product_id) 28 29 # Set your GCS bucket and file path 30 bucket_name = 'dyn_pricing_scaler' 31 scaler_file_name = 'scaler.joblib' 32 label_encoder_name = 'labelEncoder.joblib' 33 34 # Initialize a GCS client 35 storage_client = storage.Client() 36 37 # Get the bucket 38 bucket = storage_client.bucket(bucket_name) 39 40 # Get the blob (file) containing the scaler 41 blob1 = bucket.blob(scaler_file_name) 42 43 # Download the scaler file to a temporary location 44 scaler_temp_file_path = '/tmp/' + scaler_file_name 45 blob1.download_to_filename(scaler_temp_file_path) 46 47 # Download the label encoder to a temporary location 48 blob2 = bucket.blob(label_encoder_name) 49 label_encoder_temp = '/tmp/' + label_encoder_name 50 blob2.download_to_filename(label_encoder_temp) 51 52 # Clean unnecessary columns 53 if '_id' in event_data.columns: 54 event_data = event_data.drop(columns=['_id']) 55 print(event_data) 56 event_data.drop('price', axis=1, inplace=True) 57 event_data.drop('timestamp', axis=1, inplace=True) 58 print(event_data) 59 event_data.drop('product_id', axis=1, inplace=True) 60 print(event_data) 61 event_data.drop('product_name', axis=1, inplace=True) 62 print(event_data) 63 64 #Load label encoder 65 label_encoders = joblib.load(label_encoder_temp) 66 print(label_encoders) 67 68 #Encode categorical fields 69 for column, encoder in label_encoders.items(): 70 event_data[column] = encoder.transform(event_data[column]) 71 print(event_data) 72 73 74 # Load the scaler using joblib 75 scaler = joblib.load(scaler_temp_file_path) 76 print(scaler) 77 78 79 # Use scaler 80 columns_to_scale = ['action', 'encoded_name'] 81 event_data[columns_to_scale] = scaler.transform(event_data[columns_to_scale]) 82 print(event_data) 83 84 #Prepare data as tensors 85 #first_row_scaled = event_data[0, :].reshape(1, -1) 86 #print(first_row_scaled) 87 event_data = event_data.to_numpy() 88 89 # Prepare input data for VertexAI 90 input_data = {"instances": event_data} 91 print(input_data) 92 input_data['instances'] = input_data['instances'].tolist() 93 print(input_data) 94 input_data_json = json.dumps(input_data) 95 print(input_data_json) 96 97 # Call VertexAI endpoint 98 endpoint_id = "your-endpoint-id" 99 project_id = "your-project-id" 100 location = "your-project-location" 101 endpoint_url = f"https://us-central1-aiplatform.googleapis.com/v1/projects/{project_id}/locations/us-central1/endpoints/{endpoint_id}:predict" 102 103 aiplatform.init(project=project_id, location=location) 104 105 endpoint = aiplatform.Endpoint(endpoint_id) 106 107 prediction = endpoint.predict(instances=input_data['instances']) 108 109 print(prediction) 110 111 pred_price = prediction.predictions[0][0] 112 113 print(pred_price) 114 115 #Start MongoClient 116 mongo_uri = "your-mongo-db-connection-uri" #remember you can set it up in a .env file 117 client = MongoClient(mongo_uri) 118 119 db = client["dotLocalStore"] # Replace "your_database" with your actual database name 120 collection = db["products"] 121 feature_store = db["events"] 122 123 # Update products collection document with 'pred_price' 124 collection.update_one({"id": int(product_id)}, {"$set": {"pred_price": pred_price}}) 125 126 # Update events collection (feature_store) with event tensor 127 tensor = event_data.tolist() 128 print(tensor) 129 feature_store.update_one({"_id": event_id}, {"$set": {"tensor": tensor }}) 130 131 # Clean up: Delete the temporary files 132 os.remove(scaler_temp_file_path) 133 os.remove(label_encoder_temp) 134 print(pred_price) 135 136 return pred_price
Certifique-se de adicionar o
requirements.txt
visto abaixo na estrutura da pasta Cloud Function no GCP:1 functions-framework==3.* 2 pymongo 3 scikit-learn==1.2.2 4 numpy 5 google-cloud-aiplatform 6 google-auth 7 google-cloud-storage 8 pandas 9 joblib
Simulação de eventos de clientes: se você deseja imitar o comportamento do cliente, fique à vontade para usar o script Python chamado
generator.py
visto abaixo.1 import random 2 import pymongo 3 from pymongo import MongoClient 4 import os 5 import time 6 from google.cloud import pubsub_v1 7 from dotenv import load_dotenv 8 from faker import Faker 9 from datetime import datetime, timedelta 10 import json 11 from bson import ObjectId 12 import hashlib 13 14 # Load environment variables 15 load_dotenv() 16 MONGODB_URI = os.getenv('MONGODB_URI') 17 GOOGLE_APPLICATION_CREDENTIALS = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') 18 GOOGLE_CLOUD_PROJECT = os.getenv('GOOGLE_CLOUD_PROJECT') 19 PUBSUB_TOPIC_ID = os.getenv('PUBSUB_TOPIC_ID') 20 21 # Initialize MongoDB client 22 mongo_client = pymongo.MongoClient(MONGODB_URI) 23 db = mongo_client["dotLocalStore"] 24 behaviors_collection = db["events"] 25 products_collection = db["products"] 26 27 # Initialize Google Cloud Pub/Sub publisher 28 publisher = pubsub_v1.PublisherClient() 29 topic_path = publisher.topic_path(GOOGLE_CLOUD_PROJECT, PUBSUB_TOPIC_ID) 30 31 def fetch_products(): 32 """Fetch products from MongoDB""" 33 return list(products_collection.find({})) 34 35 def generate_ecommerce_behavior(product): 36 """Generate a single synthetic ecommerce behavior data for a given product""" 37 # Encode product_name into a numerical field 38 encoded_name = int(hashlib.sha256(product["name"].encode('utf-8')).hexdigest(), 16) % 10**8 39 behavior = { 40 "product_name": product["name"], 41 "product_id": product["id"], 42 "action": random.choice(["view", "add_to_cart", "purchase"]), 43 "price": product["price"], 44 "timestamp": datetime.now().isoformat(), # Format timestamp for JSON serialization 45 "encoded_name": encoded_name 46 } 47 return behavior 48 49 # Custom JSON Encoder that converts ObjectId to str 50 class JSONEncoder(json.JSONEncoder): 51 def default(self, o): 52 if isinstance(o, ObjectId): 53 return str(o) 54 return json.JSONEncoder.default(self, o) 55 56 def push_event_to_mongodb(behavior): 57 """Push a single ecommerce behavior data to MongoDB""" 58 behaviors_collection.insert_one(behavior) 59 print("Pushed an event to MongoDB.") 60 61 def push_event_to_pubsub(event): 62 """Push a single event to Google Cloud Pub/Sub""" 63 try: 64 # Attempt to serialize the event to JSON 65 data = json.dumps(event, cls=JSONEncoder).encode("utf-8") 66 # Attempt to publish the serialized data to Pub/Sub 67 future = publisher.publish(topic_path, data=data) 68 print(f"Published id:{event['product_id']} product:{event['product_name']} to Pub/Sub.") 69 future.result() # Block until the publish completes 70 except Exception as e: 71 print(f"An error occurred: {e}") 72 73 if __name__ == "__main__": 74 products = fetch_products() 75 num_behaviors_per_cycle = 150 76 77 try: 78 while True: 79 for _ in range(num_behaviors_per_cycle): 80 # Select a random product 81 selected_product = random.choice(products) 82 # Generate behavior for the selected product 83 behavior = generate_ecommerce_behavior(selected_product) 84 # Push the behavior to MongoDB 85 push_event_to_mongodb(behavior) 86 # Push the behavior to Pub/Sub 87 push_event_to_pubsub(behavior) 88 # Wait for 3 seconds before generating the next behavior 89 time.sleep(3) 90 except KeyboardInterrupt: 91 print("Stopped by the user.")
O script python gera eventos falsos do cliente com base no modelo de dados explicado. Esses eventos serão enviados para um tópico do Pub/Sub e sua coleção da loja de recursos do Atlas. Você pode ajustar o número de eventos e a cadência deles diretamente no código. Para executar este script, use o seguinte comando:
1 python3 generator.py
Depois de executar esse script, você poderá ver eventos falsos de clientes sendo inseridos no cluster do MongoDB Atlas e no tópico do Pub/Sub, acionando efetivamente o microsserviço para responder a esses eventos e calcular os pontos de preço ideais para os diferentes produtos.
Você já dominou a criação de um microsserviço de preços dinâmicos reativo? ótimo trabalho! Veja o que você aprenderam:
- Armazenamento de recursos centralizado: o MongoDB serve como um armazenamento de recursos, atuando como um repositório centralizado especificamente projetado para armazenar, gerenciar e servir recursos para modelos de aprendizado de máquina (ML). Seus recursos polimórficos permitem a utilização de uma única interface para representar vários tipos de dados. Isso implica que, à medida que novos recursos são introduzidos ou os modelos de preços evoluem, o MongoDB pode gerenciar habilidosamente diversos tipos de dados dentro do mesmo sistema. No contexto da precificação dinâmica, esse recurso facilita a incorporação contínua de novos fatores ou variáveis de precificação sem causar interrupções nas estruturas ou operações de dados existentes.
- Escalabilidade e eficiência: o Google Cloud Pub/Sub pode lidar com grandes volumes de dados de clientes com eficiência, garantindo escalabilidade para aplicativos do mundo real. Embora esse microsserviço simule apenas 25 eventos do cliente a cada três segundos, o Pub/Sub é capaz de processar fluxos de dados muito maiores.
- Atualizações de preços em tempo real: as funções da nuvem trigger os modelos TensorFlow para gerar preços dinâmicos com base no comportamento do cliente. Esses preços gerados são então inseridos ou atualizados (upserted) de volta à coleção do catálogo de produtos no MongoDB. Isso permite ajustes em tempo real no aplicativo de e-commerce, pois o front-end do aplicativo recupera dados diretamente da mesma collection.
Curioso para saber como o MongoDB está mudando o cenário do varejo? Mergulhe mais fundo nos recursos do MongoDB e veja como ele está redefinindo o setor:
O MongoDB ajuda os varejistas a inovar e obter vantagem competitiva. Inscreva-se em um workshop de inovação para explorar as possibilidades com nossos especialistas. Se você quiser se conectar com outras pessoas que usam o MongoDB para construir seu próximo grande projeto, acesse a Comunidade de desenvolvedores do MongoDB.