import logging from pathlib import Path import requests from bs4 import BeautifulSoup from langchain_core.documents import Document from langchain_text_splitters import HTMLSemanticPreservingSplitter from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_unstructured import UnstructuredLoader logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) from bs4 import Tag headers_to_split_on = [ ("h1", "Header 1"), ("h2", "Header 2"), ] def code_handler(element: Tag) -> str: """ Custom handler for code elements. """ data_lang = element.get("data-lang") code_format = f"{element.get_text()}" return code_format def process_web_sites(websites: list[str], chunk_size: int) -> list[Document]: """ Process one or more websites and returns a list of langchain Document's. """ if len(websites) == 0: return [] splits = [] for url in websites: # Fetch the webpage response = requests.get(url) html_content = response.text # Parse the HTML soup = BeautifulSoup(html_content, "html.parser") # split documents web_splitter = HTMLSemanticPreservingSplitter( headers_to_split_on=headers_to_split_on, separators=["\n\n", "\n", ". ", "! ", "? "], max_chunk_size=chunk_size, preserve_images=True, preserve_videos=True, elements_to_preserve=["table", "ul", "ol", "code"], denylist_tags=["script", "style", "head"], custom_handlers={"code": code_handler}) splits.extend(web_splitter.split_text(str(soup))) return splits def process_local_files( local_paths: list[Path], chunk_size: int, chunk_overlap: int, add_start_index: bool ) -> list[Document]: # get all files file_paths = [] for path in local_paths: if path.is_dir(): file_paths.extend(list(path.glob("*.pdf"))) if path.suffix == ".pdf": file_paths.append(path) else: logging.warning(f"Ignoring path {path} as it is not a pdf file.") # parse pdf's documents = [] for file_path in file_paths: loader = UnstructuredLoader(file_path=file_path, strategy="hi_res") for doc in loader.lazy_load(): documents.append(doc) # split documents text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap, add_start_index=add_start_index) return text_splitter.split_documents(documents)