In the second part of our series on building a RAG application on a Raspberry Pi, we’ll expand on the foundation we laid in the first part, where we created and tested the core pipeline. In the first part, we created the core pipeline and tested it to ensure everything worked as expected. Now, we’re going to take things a step further by building a FastAPI application to serve our RAG pipeline and creating a Reflex app to give users a simple and interactive way to access it. This part will guide you through setting up the FastAPI back-end, designing the front-end with Reflex, and getting everything up and running on your Raspberry Pi. By the end, you’ll have a complete, working application that’s ready for real-world use.
If you missed the previous edition, be sure to check it out here: Self-Hosting RAG Applications on Edge Devices with Langchain and Ollama – Part I.
This article was published as a part of the Data Science Blogathon.
Before we start with creating the application we need to setup the environment. Create an environment and install the below dependencies:
deeplake
boto3==1.34.144
botocore==1.34.144
fastapi==0.110.3
gunicorn==22.0.0
httpx==0.27.0
huggingface-hub==0.23.4
langchain==0.2.6
langchain-community==0.2.6
langchain-core==0.2.11
langchain-experimental==0.0.62
langchain-text-splitters==0.2.2
langsmith==0.1.83
marshmallow==3.21.3
numpy==1.26.4
pandas==2.2.2
pydantic==2.8.2
pydantic_core==2.20.1
PyMuPDF==1.24.7
PyMuPDFb==1.24.6
python-dotenv==1.0.1
pytz==2024.1
PyYAML==6.0.1
reflex==0.5.6
requests==2.32.3
reflex==0.5.6
reflex-hosting-cli==0.1.13
Once the required packages are installed, we need to have the required models present in the device. We will do this using Ollama. Follow the steps from Part-1 of this article to download both the language and embedding models. Finally, create two directories for the back-end and front-end applications.
Once the models are pulled using Ollama, we are ready to build the final application.
In the Part-1 of this article, we have built the RAG pipeline having both the Ingestion and QnA modules. We have tested both the pipelines using some documents and they were perfectly working. Now we need to wrap the pipeline with FastAPI to create consumable API. This will help us integrate it with any front-end application like Streamlit, Chainlit, Gradio, Reflex, React, Angular etc. Let’s start by building a structure for the application. Following the structure is completely optional, but make sure to check the dependency imports if you follow a different structure to create the app.
Below is the tree structure we will follow:
backend
├── app.py
├── requirements.txt
└── src
├── config.py
├── doc_loader
│ ├── base_loader.py
│ ├── __init__.py
│ └── pdf_loader.py
├── ingestion.py
├── __init__.py
└── qna.py
Let’s start with the config.py. This file will contain all the configurable options for the application, like the Ollama URL, LLM name and the embeddings model name. Below is an example:
LANGUAGE_MODEL_NAME = "phi3"
EMBEDDINGS_MODEL_NAME = "nomic-embed-text"
OLLAMA_URL = "http://localhost:11434"
The base_loader.py file contains the parent document loader class that will be inherited by children document loader. In this application we are only working with PDF files, so a Child PDFLoader class will be
created that will inherit the BaseLoader class.
Below are the contents of base_loader.py and pdf_loader.py:
# base_loader.py
from abc import ABC, abstractmethod
class BaseLoader(ABC):
def __init__(self, file_path: str) -> None:
self.file_path = file_path
@abstractmethod
async def load_document(self):
pass
# pdf_loader.py
import os
from .base_loader import BaseLoader
from langchain.schema import Document
from langchain.document_loaders.pdf import PyMuPDFLoader
from langchain.text_splitter import CharacterTextSplitter
class PDFLoader(BaseLoader):
def __init__(self, file_path: str) -> None:
super().__init__(file_path)
async def load_document(self):
self.file_name = os.path.basename(self.file_path)
loader = PyMuPDFLoader(file_path=self.file_path)
text_splitter = CharacterTextSplitter(
separator="\n",
chunk_size=1000,
chunk_overlap=200,
)
pages = await loader.aload()
total_pages = len(pages)
chunks = []
for idx, page in enumerate(pages):
chunks.append(
Document(
page_content=page.page_content,
metadata=dict(
{
"file_name": self.file_name,
"page_no": str(idx + 1),
"total_pages": str(total_pages),
}
),
)
)
final_chunks = text_splitter.split_documents(chunks)
return final_chunks
We have discussed the working of pdf_loader in the Part-1 of the article.
Next, let’s build the Ingestion class. This is same as the one we built in the Part-1 of this article.
import os
import config as cfg
from pinecone import Pinecone
from langchain.vectorstores.deeplake import DeepLake
from langchain.embeddings.ollama import OllamaEmbeddings
from .doc_loader import PDFLoader
class Ingestion:
"""Document Ingestion pipeline."""
def __init__(self):
try:
self.embeddings = OllamaEmbeddings(
model=cfg.EMBEDDINGS_MODEL_NAME,
base_url=cfg.OLLAMA_URL,
show_progress=True,
)
self.vector_store = DeepLake(
dataset_path="data/text_vectorstore",
embedding=self.embeddings,
num_workers=4,
verbose=False,
)
except Exception as e:
raise RuntimeError(f"Failed to initialize Ingestion system. ERROR: {e}")
async def create_and_add_embeddings(
self,
file: str,
):
try:
loader = PDFLoader(
file_path=file,
)
chunks = await loader.load_document()
size = await self.vector_store.aadd_documents(documents=chunks)
return len(size)
except (ValueError, RuntimeError, KeyError, TypeError) as e:
raise Exception(f"ERROR: {e}")
Now that we have setup the Ingestion class, we’ll go forward with creating the QnA class. This too is same as the one we created in the Part-1 of this article.
import os
import config as cfg
from pinecone import Pinecone
from langchain.vectorstores.deeplake import DeepLake
from langchain.embeddings.ollama import OllamaEmbeddings
from langchain_community.llms.ollama import Ollama
from .doc_loader import PDFLoader
class QnA:
"""Document Ingestion pipeline."""
def __init__(self):
try:
self.embeddings = OllamaEmbeddings(
model=cfg.EMBEDDINGS_MODEL_NAME,
base_url=cfg.OLLAMA_URL,
show_progress=True,
)
self.model = Ollama(
model=cfg.LANGUAGE_MODEL_NAME,
base_url=cfg.OLLAMA_URL,
verbose=True,
temperature=0.2,
)
self.vector_store = DeepLake(
dataset_path="data/text_vectorstore",
embedding=self.embeddings,
num_workers=4,
verbose=False,
)
self.retriever = self.vector_store.as_retriever(
search_type="similarity",
search_kwargs={
"k": 10,
},
)
except Exception as e:
raise RuntimeError(f"Failed to initialize Ingestion system. ERROR: {e}")
def create_rag_chain(self):
try:
system_prompt = """<Instructions>\n\nContext: {context}"
"""
prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", "{input}"),
]
)
question_answer_chain = create_stuff_documents_chain(self.model, prompt)
rag_chain = create_retrieval_chain(self.retriever, question_answer_chain)
return rag_chain
except Exception as e:
raise RuntimeError(f"Failed to create retrieval chain. ERROR: {e}")
With this we have finished creating the code functionalities of the RAG app. Now let’s wrap the app with FastAPI.
import sys
import os
import uvicorn
from src import QnA, Ingestion
from fastapi import FastAPI, Request, File, UploadFile
from fastapi.responses import StreamingResponse
app = FastAPI()
ingestion = Ingestion()
chatbot = QnA()
rag_chain = chatbot.create_rag_chain()
@app.get("/")
def hello():
return {"message": "API Running in server 8089"}
@app.post("/query")
async def ask_query(request: Request):
data = await request.json()
question = data.get("question")
async def event_generator():
for chunk in rag_chain.pick("answer").stream({"input": question}):
yield chunk
return StreamingResponse(event_generator(), media_type="text/plain")
@app.post("/ingest")
async def ingest_document(file: UploadFile = File(...)):
try:
os.makedirs("files", exist_ok=True)
file_location = f"files/{file.filename}"
with open(file_location, "wb+") as file_object:
file_object.write(file.file.read())
size = await ingestion.create_and_add_embeddings(file=file_location)
return {"message": f"File ingested! Document count: {size}"}
except Exception as e:
return {"message": f"An error occured: {e}"}
if __name__ == "__main__":
try:
uvicorn.run(app, host="0.0.0.0", port=8089)
except KeyboardInterrupt as e:
print("App stopped!")
Let’s breakdown the app by each endpoints:
Finally, we run the app using uvicorn package, using host and port. To test the app, simply run the application using the following command:
python app.py
Use a API testing IDE like Postman, Insomnia or Bruno for testing the application. You can also use Thunder Client extension to do the same.
Testing the Ingestion endpoint:
Testing the query endpoint:
We have successfully created a FastAPI app for the backend of our RAG application. It’s time to build our front-end. You can chose any front-end library for this, but for this particular article we will build the front-end using Reflex. Reflex is a python-only front-end library, created to build web applications, purely using python. It proves us with templates for common applications like calculator, image generation and chatbot. We will use the chatbot application template as a start for our user interface. Our final app will have the following structure, so let’s have it here for reference.
We will have a frontend directory for this:
frontend
├── assets
│ └── favicon.ico
├── docs
│ └── demo.gif
├── chat
│ ├── components
│ │ ├── chat.py
│ │ ├── file_upload.py
│ │ ├── __init__.py
│ │ ├── loading_icon.py
│ │ ├── modal.py
│ │ └── navbar.py
│ ├── __init__.py
│ ├── chat.py
│ └── state.py
├── requirements.txt
├── rxconfig.py
└── uploaded_files
Follow the steps to prepare the grounding for the final app.
git clone https://github.com/reflex-dev/reflex-chat.git .
reflex init
This will setup the reflex app and will be ready to run and develop.
reflex run
Let’s start modifying the components. First let’s modify the chat.py file.
Below is the code for the same:
import reflex as rx
from reflex_demo.components import loading_icon
from reflex_demo.state import QA, State
message_style = dict(
display="inline-block",
padding="0 10px",
border_radius="8px",
max_width=["30em", "30em", "50em", "50em", "50em", "50em"],
)
def message(qa: QA) -> rx.Component:
"""A single question/answer message.
Args:
qa: The question/answer pair.
Returns:
A component displaying the question/answer pair.
"""
return rx.box(
rx.box(
rx.markdown(
qa.question,
background_color=rx.color("mauve", 4),
color=rx.color("mauve", 12),
**message_style,
),
text_align="right",
margin_top="1em",
),
rx.box(
rx.markdown(
qa.answer,
background_color=rx.color("accent", 4),
color=rx.color("accent", 12),
**message_style,
),
text_align="left",
padding_top="1em",
),
width="100%",
)
def chat() -> rx.Component:
"""List all the messages in a single conversation."""
return rx.vstack(
rx.box(rx.foreach(State.chats[State.current_chat], message), width="100%"),
py="8",
flex="1",
width="100%",
max_width="50em",
padding_x="4px",
align_self="center",
overflow="hidden",
padding_bottom="5em",
)
def action_bar() -> rx.Component:
"""The action bar to send a new message."""
return rx.center(
rx.vstack(
rx.chakra.form(
rx.chakra.form_control(
rx.hstack(
rx.input(
rx.input.slot(
rx.tooltip(
rx.icon("info", size=18),
content="Enter a question to get a response.",
)
),
placeholder="Type something...",
id="question",
width=["15em", "20em", "45em", "50em", "50em", "50em"],
),
rx.button(
rx.cond(
State.processing,
loading_icon(height="1em"),
rx.text("Send", font_family="Ubuntu"),
),
type="submit",
),
align_items="center",
),
is_disabled=State.processing,
),
on_submit=State.process_question,
reset_on_submit=True,
),
rx.text(
"ReflexGPT may return factually incorrect or misleading responses. Use discretion.",
text_align="center",
font_size=".75em",
color=rx.color("mauve", 10),
font_family="Ubuntu",
),
rx.logo(margin_top="-1em", margin_bottom="-1em"),
align_items="center",
),
position="sticky",
bottom="0",
left="0",
padding_y="16px",
backdrop_filter="auto",
backdrop_blur="lg",
border_top=f"1px solid {rx.color('mauve', 3)}",
background_color=rx.color("mauve", 2),
align_items="stretch",
width="100%",
)
The changes are minimal from the one present natively in the template.
Next, we will edit the chat.py app. This is the main chat component.
Below is the code for it:
import reflex as rx
from reflex_demo.components import chat, navbar, upload_form
from reflex_demo.state import State
@rx.page(route="/chat", title="RAG Chatbot")
def chat_interface() -> rx.Component:
return rx.chakra.vstack(
navbar(),
chat.chat(),
chat.action_bar(),
background_color=rx.color("mauve", 1),
color=rx.color("mauve", 12),
min_height="100vh",
align_items="stretch",
spacing="0",
)
@rx.page(route="/", title="RAG Chatbot")
def index() -> rx.Component:
return rx.chakra.vstack(
navbar(),
upload_form(),
background_color=rx.color("mauve", 1),
color=rx.color("mauve", 12),
min_height="100vh",
align_items="stretch",
spacing="0",
)
# Add state and page to the app.
app = rx.App(
theme=rx.theme(
appearance="dark",
accent_color="jade",
),
stylesheets=["https://fonts.googleapis.com/css2?family=Ubuntu&display=swap"],
style={
"font_family": "Ubuntu",
},
)
app.add_page(index)
app.add_page(chat_interface)
This is the code for the chat interface. We have only added the Font family to the app config, the rest of the code is the same.
Next let’s edit the state.py file. This is where the frontend will make call to the API endpoints for response.
import requests
import reflex as rx
class QA(rx.Base):
question: str
answer: str
DEFAULT_CHATS = {
"Intros": [],
}
class State(rx.State):
chats: dict[str, list[QA]] = DEFAULT_CHATS
current_chat = "Intros"
url: str = "http://localhost:8089/query"
question: str
processing: bool = False
new_chat_name: str = ""
def create_chat(self):
"""Create a new chat."""
# Add the new chat to the list of chats.
self.current_chat = self.new_chat_name
self.chats[self.new_chat_name] = []
def delete_chat(self):
"""Delete the current chat."""
del self.chats[self.current_chat]
if len(self.chats) == 0:
self.chats = DEFAULT_CHATS
self.current_chat = list(self.chats.keys())[0]
def set_chat(self, chat_name: str):
"""Set the name of the current chat.
Args:
chat_name: The name of the chat.
"""
self.current_chat = chat_name
@rx.var
def chat_titles(self) -> list[str]:
"""Get the list of chat titles.
Returns:
The list of chat names.
"""
return list(self.chats.keys())
async def process_question(self, form_data: dict[str, str]):
# Get the question from the form
question = form_data["question"]
# Check if the question is empty
if question == "":
return
model = self.openai_process_question
async for value in model(question):
yield value
async def openai_process_question(self, question: str):
"""Get the response from the API.
Args:
form_data: A dict with the current question.
"""
# Add the question to the list of questions.
qa = QA(question=question, answer="")
self.chats[self.current_chat].append(qa)
payload = {"question": question}
# Clear the input and start the processing.
self.processing = True
yield
response = requests.post(self.url, json=payload, stream=True)
# Stream the results, yielding after every word.
for answer_text in response.iter_content(chunk_size=512):
# Ensure answer_text is not None before concatenation
answer_text = answer_text.decode()
if answer_text is not None:
self.chats[self.current_chat][-1].answer += answer_text
else:
answer_text = ""
self.chats[self.current_chat][-1].answer += answer_text
self.chats = self.chats
yield
# Toggle the processing flag.
self.processing = False
In this file, we have defined the URL for the query endpoint. We have also modified the openai_process_question method to send a POST request to the query endpoint and get the streaming
response, which will be displayed in the chat interface.
Finally, let’s write the contents of the file_upload.py file. This component will be displayed in the beginning which will allow us to upload the file for ingestion.
import reflex as rx
import os
import time
import requests
class UploadExample(rx.State):
uploading: bool = False
ingesting: bool = False
progress: int = 0
total_bytes: int = 0
ingestion_url = "http://127.0.0.1:8089/ingest"
async def handle_upload(self, files: list[rx.UploadFile]):
self.ingesting = True
yield
for file in files:
file_bytes = await file.read()
file_name = file.filename
files = {
"file": (os.path.basename(file_name), file_bytes, "multipart/form-data")
}
response = requests.post(self.ingestion_url, files=files)
self.ingesting = False
yield
if response.status_code == 200:
# yield rx.redirect("/chat")
self.show_redirect_popup()
def handle_upload_progress(self, progress: dict):
self.uploading = True
self.progress = round(progress["progress"] * 100)
if self.progress >= 100:
self.uploading = False
def cancel_upload(self):
self.uploading = False
return rx.cancel_upload("upload3")
def upload_form():
return rx.vstack(
rx.upload(
rx.flex(
rx.text(
"Drag and drop file here or click to select file",
font_family="Ubuntu",
),
rx.icon("upload", size=30),
direction="column",
align="center",
),
id="upload3",
border="1px solid rgb(233, 233,233, 0.4)",
margin="5em 0 10px 0",
background_color="rgb(107,99,246)",
border_radius="8px",
padding="1em",
),
rx.vstack(rx.foreach(rx.selected_files("upload3"), rx.text)),
rx.cond(
~UploadExample.ingesting,
rx.button(
"Upload",
on_click=UploadExample.handle_upload(
rx.upload_files(
upload_id="upload3",
on_upload_progress=UploadExample.handle_upload_progress,
),
),
),
rx.flex(
rx.spinner(size="3", loading=UploadExample.ingesting),
rx.button(
"Cancel",
on_click=UploadExample.cancel_upload,
),
align="center",
spacing="3",
),
),
rx.alert_dialog.root(
rx.alert_dialog.trigger(
rx.button("Continue to Chat", color_scheme="green"),
),
rx.alert_dialog.content(
rx.alert_dialog.title("Redirect to Chat Interface?"),
rx.alert_dialog.description(
"You will be redirected to the Chat Interface.",
size="2",
),
rx.flex(
rx.alert_dialog.cancel(
rx.button(
"Cancel",
variant="soft",
color_scheme="gray",
),
),
rx.alert_dialog.action(
rx.button(
"Continue",
color_scheme="green",
variant="solid",
on_click=rx.redirect("/chat"),
),
),
spacing="3",
margin_top="16px",
justify="end",
),
style={"max_width": 450},
),
),
align="center",
)
This component will allow us to upload a file and ingest it into the vector store. It uses the ingest endpoint of our FastAPI app to upload and ingest the file. After ingestion, the user can simply move
to the chat interface for asking queries.
With this we have completed building the front-end for our application. Now we will need to test the application using some document.
Now let’s test the application on some manuals or documents. To use the application, we need to run both the back-end app and the reflex app separately. Run the back-end app from it’s directory using the
following command:
python app.py
Wait for the FastAPI to start running. Then in another terminal instance run the front-end app using the following command:
reflex run
One the apps are up and running, got to the following URL to access the reflex app. Initially we would be in the File Upload page. Upload a file and press the upload button.
The file will be uploaded and ingested. This will take a while depending on the document size and
the device specs. Once it’s done, click on the ‘Continue to Chat’ button to move to the chat interface. Write your query and press Send.
In this two part series, you’ve now built a complete and functional RAG application on a Raspberry Pi, from creating the core pipeline to wrapping it with a FastAPI back-end and developing a Reflex-based front-end. With these tools, your RAG pipeline is accessible and interactive, providing real-time query processing through a user-friendly web interface. By mastering these steps, you’ve gained valuable experience in building and deploying end-to-end applications on a compact, efficient platform. This setup opens the door to countless possibilities for deploying AI-driven applications on resource-constrained devices like the Raspberry Pi, making cutting-edge technology more accessible and practical for everyday use.
A. There is a platform named Tailscale that allows your devices to be connected to a private secure network, accessible only to you. You can add your Raspberry Pi and other devices to Tailscale devices and connect to the VPN to access your apps, from anywhere within the world.
A. That is the constraint due to low hardware specifications of Raspberry Pi. The article is just a head up tutorial on how to start building RAG app using Raspberry Pi and Ollama.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.