Wednesday, January 21, 2026

What is Late Chunking

What it does: Processes the entire document through the transformer before chunking the token embeddings (not the text).


The problem it solves: Traditional chunking loses long-distance context. Late chunking preserves full document context in each chunk’s embedding.


Conceptual example:


def late_chunk(text: str, chunk_size=512) -> list:

    """Embed full document BEFORE chunking"""

  

    # Step 1: Embed entire document (8192 tokens max)

    full_doc_token_embeddings = transformer_embed(text)  # Token-level

  

    # Step 2: Define chunk boundaries

    tokens = tokenize(text)

    chunk_boundaries = range(0, len(tokens), chunk_size)

  

    # Step 3: Pool token embeddings for each chunk

    chunks_with_embeddings = []

    for start in chunk_boundaries:

        end = start + chunk_size

        chunk_text = detokenize(tokens[start:end])

  

        # Mean pool token embeddings (preserves full doc context!)

        chunk_embedding = mean_pool(full_doc_token_embeddings[start:end])

        chunks_with_embeddings.append((chunk_text, chunk_embedding))

  

    return chunks_with_embeddings




Late chunking in the context of embeddings for GenAI (Generative AI) is a strategy used when processing large documents or datasets for vector embeddings, particularly in RAG (Retrieval-Augmented Generation) workflows.


Here’s a clear breakdown:


Definition


Late chunking means delaying the splitting of content into smaller pieces (chunks) until after embedding generation has started or the content has been initially processed.


Instead of splitting a large document into chunks before generating embeddings (which is early chunking), the model or system first generates embeddings for larger units (like full documents or sections) and then splits or processes them further later in the pipeline if needed.


Why use Late Chunking?


Preserves context


Early chunking may break semantic context by splitting sentences or paragraphs arbitrarily.


Late chunking allows embeddings to capture larger context, improving similarity searches.


Efficient processing


You can generate embeddings for larger units first and only create smaller chunks if retrieval or indexing requires it, reducing unnecessary computations.


Dynamic retrieval granularity


Allows flexible adjustment of chunk size later depending on how the embeddings will be queried or used in the application.


Comparison to Early Chunking

Feature Early Chunking Late Chunking

When text is split Before embedding After embedding or during retrieval

Context retention Lower (may lose semantic links across chunks) Higher (larger context retained)

Processing efficiency May generate more embeddings unnecessarily Can reduce embedding count

Use case Simple search or small documents Large documents, long context GenAI applications


💡 Example Scenario:


A book with 1000 pages is to be used in a RAG application.


Early chunking: Split into 2-page chunks first → 500 embeddings.


Late chunking: Generate embeddings for each chapter first → 20 embeddings, then split chapters into smaller chunks later only if needed.


This approach balances context preservation and computational efficiency.

Tuesday, January 20, 2026

What is an example of Graphiti with Neo4J

from graphiti_core import Graphiti

from graphiti_core.nodes import EpisodeType


# Initialize Graphiti (connects to Neo4j)

graphiti = Graphiti("neo4j://localhost:7687", "neo4j", "password")

async def ingest_document(text: str, source: str):

    """Ingest into knowledge graph"""

    # Graphiti automatically extracts entities and relationships

    await graphiti.add_episode(

        name=source,

        episode_body=text,

        source=EpisodeType.text,

        source_description=f"Document: {source}"

    )

async def search_knowledge_graph(query: str) -> str:

    """Hybrid search: semantic + keyword + graph"""

    # Graphiti combines:

    # - Semantic similarity (embeddings)

    # - BM25 keyword search

    # - Graph structure traversal

    # - Temporal context

  

    results = await graphiti.search(query=query, num_results=5)

  

    # Format graph results

    formatted = []

    for result in results:

        formatted.append(

            f"Entity: {result.node.name}\n"

            f"Type: {result.node.type}\n"

            f"Relationships: {result.relationships}"

        )

  

    return "\n---\n".join(formatted)





A Cross Encoder Example

 from sentence_transformers import CrossEncoder


# Initialize once
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
async def search_with_reranking(query: str, limit: int = 5) -> list:
# Stage 1: Fast vector retrieval (get 4x candidates)
candidate_limit = min(limit * 4, 20)
query_embedding = await embedder.embed_query(query)

candidates = await db.query(
"SELECT content, metadata FROM chunks ORDER BY embedding $1 LIMIT $2",
query_embedding, candidate_limit
)

# Stage 2: Re-rank with cross-encoder
pairs = [[query, row['content']] for row in candidates]
scores = reranker.predict(pairs)

# Sort by reranker scores and return top N
reranked = sorted(
zip(candidates, scores),
key=lambda x: x[1],
reverse=True
)[:limit]

return [doc for doc, score in reranked]

Saturday, January 17, 2026

How AWS Config, AWS Inspector, AWS Audit Manager and AWS Artifact work togther ?

 Excellent question — these four AWS services (Audit Manager, Config, Inspector, and Artifact) all relate to security, compliance, and governance, but they serve very different purposes within that ecosystem.

Let’s break them down in a clear, structured way 👇


🧩 High-Level Summary

ServicePrimary PurposeType
AWS Audit ManagerContinuously collects evidence and automates audit reporting for compliance frameworksCompliance reporting tool
AWS ConfigTracks configuration changes and checks AWS resources against compliance rulesConfiguration monitoring tool
Amazon InspectorScans workloads for vulnerabilities and security issuesSecurity assessment tool
AWS ArtifactProvides on-demand access to AWS compliance reports and agreementsCompliance documentation portal

🧠 1. AWS Audit Manager

🔹 Purpose:

Helps you audit your AWS environment automatically to simplify compliance with frameworks like ISO 27001, GDPR, PCI-DSS, SOC 2, etc.

⚙️ How It Works:

  • Continuously collects evidence (data points) from AWS services (like Config, CloudTrail, IAM).

  • Maps them to control sets defined by compliance frameworks.

  • Generates audit-ready reports automatically.

📋 Key Features:

  • Prebuilt compliance frameworks and control mappings.

  • Automated evidence collection (no manual screenshots or data gathering).

  • Integration with AWS Organizations (multi-account audits).

  • Custom frameworks for internal governance.

🧭 Best For:

  • Compliance teams or auditors.

  • Organizations preparing for certifications or audits.

🧩 Example:

“Show me all evidence that IAM users require MFA.”
Audit Manager automatically gathers this proof over time.


⚙️ 2. AWS Config

🔹 Purpose:

Tracks and records configuration changes of AWS resources to ensure they remain compliant with desired settings or internal policies.

⚙️ How It Works:

  • Continuously records resource configurations (EC2, IAM, S3, VPC, etc.).

  • Allows you to define Config Rules (managed or custom using Lambda).

  • Detects non-compliant resources and triggers alerts or remediation.

📋 Key Features:

  • Real-time configuration tracking and history.

  • Compliance evaluation against internal or AWS standards.

  • Integration with CloudTrail and Security Hub.

🧭 Best For:

  • DevOps, security, and compliance teams wanting configuration drift detection.

  • Maintaining continuous resource compliance posture.

🧩 Example:

“Alert me if any S3 bucket becomes public.”
AWS Config continuously monitors and flags such violations.


🛡️ 3. Amazon Inspector

🔹 Purpose:

An automated vulnerability management service that scans workloads for security issues.

⚙️ How It Works:

  • Automatically discovers EC2 instances, container images (ECR), and Lambda functions.

  • Continuously scans for:

    • CVEs (Common Vulnerabilities and Exposures)

    • Misconfigurations

    • Software package vulnerabilities

  • Prioritizes findings by severity (CVSS score, exploitability).

📋 Key Features:

  • Continuous vulnerability scanning.

  • Agentless scanning for EC2 and container images.

  • Integration with AWS Security Hub, EventBridge, and Inspector dashboard.

  • Automatic remediation support.

🧭 Best For:

  • Security operations and compliance monitoring.

  • Continuous vulnerability assessment of compute resources.

🧩 Example:

“Detect and alert if any EC2 instance has a vulnerable OpenSSL version.”


📄 4. AWS Artifact

🔹 Purpose:

A self-service portal that provides AWS compliance reports, certifications, and agreements (e.g., SOC, ISO, PCI, GDPR).

⚙️ How It Works:

  • You access it from the AWS Console (no setup required).

  • Download third-party audit reports of AWS infrastructure.

  • Accept compliance agreements (e.g., Business Associate Addendum (BAA) for HIPAA).

📋 Key Features:

  • Central access to AWS’s own compliance evidence.

  • No cost; just authentication required.

  • Up-to-date compliance documentation and certifications.

🧭 Best For:

  • Compliance and legal teams.

  • Customers needing AWS compliance proof for audits.

🧩 Example:

“I need AWS’s SOC 2 Type II report to show my auditor.”
You download it directly from AWS Artifact.


⚖️ 5. Key Differences

FeatureAWS Audit ManagerAWS ConfigAmazon InspectorAWS Artifact
PurposeAutomate collection of audit evidenceMonitor resource configurationsDetect vulnerabilitiesProvide AWS compliance reports
Focus AreaCompliance automationConfiguration complianceSecurity posture & CVE detectionExternal compliance documentation
ScopeOrganization-level auditsResource-level stateInstance, container, Lambda-level scanningAWS infrastructure compliance
CustomizationCustom frameworksCustom Config rulesCustom scan targetsNone (read-only portal)
OutputAudit reports, control evidenceCompliance dashboardVulnerability findingsDownloadable reports (PDF)
Integration with OthersUses Config, CloudTrail, IAM dataFeeds data to Audit Manager, Security HubIntegrates with Security HubStandalone portal
User RoleAuditors & compliance officersDevOps/SecOpsSecurity engineersCompliance/legal staff

🧩 6. How They Work Together

Here’s how they complement each other in a real compliance workflow:

  1. AWS Config → Monitors your resource configurations.

  2. Amazon Inspector → Scans for vulnerabilities in EC2, ECR, Lambda.

  3. AWS Audit Manager → Collects evidence from Config, Inspector, IAM, etc., and maps it to compliance controls (e.g., SOC 2).

  4. AWS Artifact → Provides the official AWS compliance documentation to share with auditors.

📊 Example Flow:

Config detects non-compliant S3 → Inspector detects a vulnerability → Audit Manager collects both as audit evidence → Artifact provides AWS’s ISO report for your compliance pack.


🧠 Simple Analogy

ServiceAnalogy
Audit ManagerYour automated audit assistant (collects compliance evidence)
ConfigYour compliance monitor (tracks changes and deviations)
InspectorYour security scanner (finds vulnerabilities)
ArtifactYour compliance library (stores AWS certifications and reports)

Would you like me to add a diagram showing how these four services connect in a compliance architecture (e.g., flow from Config → Inspector → Audit Manager → Artifact)? It visually clarifies their interaction.

Wednesday, January 14, 2026

goroutine 5 - Using github.com/Jeffail/tunny (Third-party Library)

 package main


import (

"fmt"

"time"

"github.com/Jeffail/tunny"

)


func main() {

const numTasks = 50

// Create pool with 10 workers

pool := tunny.NewFunc(10, func(payload interface{}) interface{} {

taskID := payload.(int)

// Simulate work

time.Sleep(time.Millisecond * time.Duration(100+taskID*10))

return fmt.Sprintf("Task %d completed", taskID)

})

defer pool.Close()

// Submit tasks

results := make([]interface{}, numTasks)

for i := 1; i <= numTasks; i++ {

go func(taskID int) {

results[taskID-1] = pool.Process(taskID)

}(i)

}

// Wait for tasks (simple wait for demo)

time.Sleep(3 * time.Second)

// Count completed tasks

completed := 0

for _, result := range results {

if result != nil {

completed++

fmt.Println(result.(string))

}

}

fmt.Printf("\nSummary: %d out of %d tasks completed\n", completed, numTasks)

}

goroutine 4 - ErrGroup with Context and Worker Pool

 package main


import (

"context"

"fmt"

"golang.org/x/sync/errgroup"

"sync"

"time"

)


func workerPool(ctx context.Context, numWorkers, numTasks int) ([]string, error) {

taskChan := make(chan int, numTasks)

resultChan := make(chan string, numTasks)

// Create worker pool

g, ctx := errgroup.WithContext(ctx)

g.SetLimit(numWorkers)

// Start workers

for i := 0; i < numWorkers; i++ {

g.Go(func() error {

for {

select {

case <-ctx.Done():

return ctx.Err()

case taskID, ok := <-taskChan:

if !ok {

return nil

}

// Process task

time.Sleep(time.Millisecond * time.Duration(100+taskID*10))

resultChan <- fmt.Sprintf("Processed task %d", taskID)

}

}

})

}

// Feed tasks

go func() {

for i := 1; i <= numTasks; i++ {

taskChan <- i

}

close(taskChan)

}()

// Collect results

var results []string

var wg sync.WaitGroup

wg.Add(1)

go func() {

defer wg.Done()

for result := range resultChan {

results = append(results, result)

}

}()

// Wait for completion

if err := g.Wait(); err != nil {

return nil, err

}

close(resultChan)

wg.Wait()

return results, nil

}


func main() {

ctx := context.Background()

results, err := workerPool(ctx, 10, 50)

if err != nil {

fmt.Printf("Error: %v\n", err)

return

}

fmt.Printf("Summary: Completed %d tasks\n", len(results))

}

goroutine - 3 - Buffered Channel as Semaphore (Classic Go Pattern)

 package main


import (

"fmt"

"sync"

"time"

)


func processTask(taskID int, sem chan struct{}, results chan<- string) {

defer func() { <-sem }() // Release the semaphore

// Simulate work

time.Sleep(time.Millisecond * time.Duration(100+taskID*10))

results <- fmt.Sprintf("Task %d completed", taskID)

}


func main() {

const totalTasks = 50

const maxConcurrency = 10

sem := make(chan struct{}, maxConcurrency)

results := make(chan string, totalTasks)

var wg sync.WaitGroup

// Launch tasks

for i := 1; i <= totalTasks; i++ {

wg.Add(1)

go func(taskID int) {

defer wg.Done()

sem <- struct{}{} // Acquire semaphore

processTask(taskID, sem, results)

}(i)

}

// Wait for all tasks

go func() {

wg.Wait()

close(results)

}()

// Collect results

var completed []string

for result := range results {

completed = append(completed, result)

fmt.Println(result)

}

fmt.Printf("\nSummary: Completed %d tasks with max %d concurrent workers\n", 

len(completed), maxConcurrency)

}